Merge "Update retype API to use versionedobjects"

This commit is contained in:
Jenkins 2015-11-13 04:52:33 +00:00 committed by Gerrit Code Review
commit 8d0e9f381a
9 changed files with 184 additions and 99 deletions

View File

@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
RPC_API_VERSION = '1.9'
RPC_API_VERSION = '1.10'
target = messaging.Target(version=RPC_API_VERSION)
@ -182,7 +182,7 @@ class SchedulerManager(manager.Manager):
force_host_copy)
def retype(self, context, topic, volume_id,
request_spec, filter_properties=None):
request_spec, filter_properties=None, volume=None):
"""Schedule the modification of a volume's type.
:param context: the request context
@ -190,10 +190,17 @@ class SchedulerManager(manager.Manager):
:param volume_id: the ID of the volume to retype
:param request_spec: parameters for this retype request
:param filter_properties: parameters to filter by
:param volume: the volume object to retype
"""
self._wait_for_scheduler()
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def _retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations):
if reservations:
@ -204,14 +211,13 @@ class SchedulerManager(manager.Manager):
self._set_volume_state_and_notify('retype', volume_state,
context, ex, request_spec, msg)
volume_ref = db.volume_get(context, volume_id)
reservations = request_spec.get('quota_reservations')
new_type = request_spec.get('volume_type')
if new_type is None:
msg = _('New volume type not specified in request_spec.')
ex = exception.ParameterNotFound(param='volume_type')
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations)
volume, msg, reservations)
# Default migration policy is 'never'
migration_policy = request_spec.get('migration_policy')
@ -225,15 +231,15 @@ class SchedulerManager(manager.Manager):
except exception.NoValidHost as ex:
msg = (_("Could not find a host for volume %(volume_id)s with "
"type %(type_id)s.") %
{'type_id': new_type['id'], 'volume_id': volume_id})
{'type_id': new_type['id'], 'volume_id': volume.id})
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, msg, reservations)
volume, msg, reservations)
except Exception as ex:
with excutils.save_and_reraise_exception():
_retype_volume_set_error(self, context, ex, request_spec,
volume_ref, None, reservations)
volume, None, reservations)
else:
volume_rpcapi.VolumeAPI().retype(context, volume_ref,
volume_rpcapi.VolumeAPI().retype(context, volume,
new_type['id'], tgt_host,
migration_policy, reservations)

View File

@ -43,6 +43,7 @@ class SchedulerAPI(object):
1.7 - Add get_active_pools method
1.8 - Add sending object over RPC in create_consistencygroup method
1.9 - Adds support for sending objects over RPC in create_volume()
1.10 - Adds support for sending objects over RPC in retype()
"""
RPC_API_VERSION = '1.0'
@ -107,15 +108,20 @@ class SchedulerAPI(object):
filter_properties=filter_properties)
def retype(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):
request_spec=None, filter_properties=None, volume=None):
cctxt = self.client.prepare(version='1.4')
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'retype',
topic=topic,
volume_id=volume_id,
request_spec=request_spec_p,
filter_properties=filter_properties)
msg_args = {'topic': topic, 'volume_id': volume_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
if self.client.can_send_version('1.10'):
version = '1.10'
msg_args['volume'] = volume
else:
version = '1.4'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):

View File

@ -128,14 +128,31 @@ class SchedulerRpcAPITestCase(test.TestCase):
filter_properties='filter_properties',
version='1.3')
def test_retype(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_retype(self, can_send_version):
self._test_scheduler_api('retype',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.10')
can_send_version.assert_called_with('1.10')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_retype_old(self, can_send_version):
self._test_scheduler_api('retype',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.4')
can_send_version.assert_called_with('1.10')
def test_manage_existing(self):
self._test_scheduler_api('manage_existing',

View File

@ -225,37 +225,37 @@ class SchedulerManagerTestCase(test.TestCase):
request_spec, {})
@mock.patch('cinder.db.volume_update')
@mock.patch('cinder.db.volume_get')
def test_retype_volume_exception_returns_volume_state(self, _mock_vol_get,
_mock_vol_update):
@mock.patch('cinder.db.volume_attachment_get_used_by_volume_id')
def test_retype_volume_exception_returns_volume_state(
self, _mock_vol_attachment_get, _mock_vol_update):
# Test NoValidHost exception behavior for retype.
# Puts the volume in original state and eats the exception.
volume = tests_utils.create_volume(self.context,
status='retyping',
previous_status='in-use')
instance_uuid = '12345678-1234-5678-1234-567812345678'
volume = tests_utils.attach_volume(self.context, volume['id'],
instance_uuid, None, '/dev/fake')
fake_volume_id = volume.id
volume_attach = tests_utils.attach_volume(self.context, volume.id,
instance_uuid, None,
'/dev/fake')
_mock_vol_attachment_get.return_value = [volume_attach]
topic = 'fake_topic'
request_spec = {'volume_id': fake_volume_id, 'volume_type': {'id': 3},
request_spec = {'volume_id': volume.id, 'volume_type': {'id': 3},
'migration_policy': 'on-demand'}
_mock_vol_get.return_value = volume
_mock_vol_update.return_value = {'status': 'in-use'}
_mock_find_retype_host = mock.Mock(
side_effect=exception.NoValidHost(reason=""))
orig_retype = self.manager.driver.find_retype_host
self.manager.driver.find_retype_host = _mock_find_retype_host
self.manager.retype(self.context, topic, fake_volume_id,
self.manager.retype(self.context, topic, volume.id,
request_spec=request_spec,
filter_properties={})
filter_properties={},
volume=volume)
_mock_vol_get.assert_called_once_with(self.context, fake_volume_id)
_mock_find_retype_host.assert_called_once_with(self.context,
request_spec, {},
'on-demand')
_mock_vol_update.assert_called_once_with(self.context, fake_volume_id,
_mock_vol_update.assert_called_once_with(self.context, volume.id,
{'status': 'in-use'})
self.manager.driver.find_retype_host = orig_retype

View File

@ -683,6 +683,17 @@ class VolumeTestCase(BaseVolumeTestCase):
False,
FAKE_METADATA_TYPE.fake_type)
@mock.patch('cinder.db.volume_update')
def test_update_with_ovo(self, volume_update):
"""Test update volume using oslo_versionedobject."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
volume_api = cinder.volume.api.API()
updates = {'display_name': 'foobbar'}
volume_api.update(self.context, volume, updates)
volume_update.assert_called_once_with(self.context, volume.id,
updates)
self.assertEqual('foobbar', volume.display_name)
def test_delete_volume_metadata_with_metatype(self):
"""Test delete volume metadata with different metadata type."""
test_meta1 = {'fake_key1': 'fake_value1', 'fake_key2': 'fake_value2'}
@ -3908,7 +3919,6 @@ class VolumeTestCase(BaseVolumeTestCase):
status='creating', host=CONF.host)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
volume['host'] = 'fakehost'
volume_api = cinder.volume.api.API()
@ -4753,15 +4763,16 @@ class VolumeMigrationTestCase(VolumeTestCase):
host=CONF.host, status='retyping',
volume_type_id=old_vol_type['id'],
replication_status=rep_status)
volume['previous_status'] = 'available'
volume.previous_status = 'available'
volume.save()
if snap:
self._create_snapshot(volume['id'], size=volume['size'])
self._create_snapshot(volume.id, size=volume.size)
if driver or diff_equal:
host_obj = {'host': CONF.host, 'capabilities': {}}
else:
host_obj = {'host': 'newhost', 'capabilities': {}}
reserve_opts = {'volumes': 1, 'gigabytes': volume['size']}
reserve_opts = {'volumes': 1, 'gigabytes': volume.size}
QUOTAS.add_volume_type_opts(self.context,
reserve_opts,
vol_type['id'])
@ -4782,20 +4793,21 @@ class VolumeMigrationTestCase(VolumeTestCase):
_mig.return_value = True
if not exc:
self.volume.retype(self.context, volume['id'],
self.volume.retype(self.context, volume.id,
vol_type['id'], host_obj,
migration_policy=policy,
reservations=reservations)
reservations=reservations,
volume=volume)
else:
self.assertRaises(exc, self.volume.retype,
self.context, volume['id'],
self.context, volume.id,
vol_type['id'], host_obj,
migration_policy=policy,
reservations=reservations)
get_volume.assert_called_once_with(self.context, volume['id'])
reservations=reservations,
volume=volume)
# get volume/quota properties
volume = db.volume_get(elevated, volume['id'])
volume = objects.Volume.get_by_id(elevated, volume.id)
try:
usage = db.quota_usage_get(elevated, project_id, 'volumes_new')
volumes_in_use = usage.in_use
@ -4804,19 +4816,19 @@ class VolumeMigrationTestCase(VolumeTestCase):
# check properties
if driver or diff_equal:
self.assertEqual(vol_type['id'], volume['volume_type_id'])
self.assertEqual('available', volume['status'])
self.assertEqual(CONF.host, volume['host'])
self.assertEqual(vol_type['id'], volume.volume_type_id)
self.assertEqual('available', volume.status)
self.assertEqual(CONF.host, volume.host)
self.assertEqual(1, volumes_in_use)
elif not exc:
self.assertEqual(old_vol_type['id'], volume['volume_type_id'])
self.assertEqual('retyping', volume['status'])
self.assertEqual(CONF.host, volume['host'])
self.assertEqual(old_vol_type['id'], volume.volume_type_id)
self.assertEqual('retyping', volume.status)
self.assertEqual(CONF.host, volume.host)
self.assertEqual(1, volumes_in_use)
else:
self.assertEqual(old_vol_type['id'], volume['volume_type_id'])
self.assertEqual('available', volume['status'])
self.assertEqual(CONF.host, volume['host'])
self.assertEqual(old_vol_type['id'], volume.volume_type_id)
self.assertEqual('available', volume.status)
self.assertEqual(CONF.host, volume.host)
self.assertEqual(0, volumes_in_use)
def test_retype_volume_driver_success(self):

View File

@ -399,7 +399,9 @@ class VolumeRpcAPITestCase(test.TestCase):
error=False,
version='1.10')
def test_retype(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_retype(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
@ -407,12 +409,31 @@ class VolumeRpcAPITestCase(test.TestCase):
dest_host = FakeHost()
self._test_volume_api('retype',
rpc_method='cast',
volume=self.fake_volume,
volume=self.fake_volume_obj,
new_type_id='fake',
dest_host=dest_host,
migration_policy='never',
reservations=None,
version='1.34')
can_send_version.assert_called_once_with('1.34')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_retype_old(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
self.capabilities = {}
dest_host = FakeHost()
self._test_volume_api('retype',
rpc_method='cast',
volume=self.fake_volume_obj,
new_type_id='fake',
dest_host=dest_host,
migration_policy='never',
reservations=None,
version='1.12')
can_send_version.assert_called_once_with('1.34')
def test_manage_existing(self):
self._test_volume_api('manage_existing',

View File

@ -456,8 +456,16 @@ class API(base.Base):
msg = _("The volume cannot be updated during maintenance.")
raise exception.InvalidVolume(reason=msg)
vref = self.db.volume_update(context, volume['id'], fields)
LOG.info(_LI("Volume updated successfully."), resource=vref)
# NOTE(thangp): Update is called by various APIs, some of which are
# not yet using oslo_versionedobjects. We need to handle the case
# where volume is either a dict or a oslo_versionedobject.
if isinstance(volume, objects_base.CinderObject):
volume.update(fields)
volume.save()
LOG.info(_LI("Volume updated successfully."), resource=volume)
else:
vref = self.db.volume_update(context, volume['id'], fields)
LOG.info(_LI("Volume updated successfully."), resource=vref)
def get(self, context, volume_id, viewable_admin_meta=False):
volume = objects.Volume.get_by_id(context, volume_id)
@ -1436,18 +1444,18 @@ class API(base.Base):
@wrap_check_policy
def retype(self, context, volume, new_type, migration_policy=None):
"""Attempt to modify the type associated with an existing volume."""
if volume['status'] not in ['available', 'in-use']:
if volume.status not in ['available', 'in-use']:
msg = _('Unable to update type due to incorrect status: '
'%(vol_status)s on volume: %(vol_id)s. Volume status '
'must be available or '
'in-use.') % {'vol_status': volume['status'],
'vol_id': volume['id']}
'in-use.') % {'vol_status': volume.status,
'vol_id': volume.id}
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
if self._is_volume_migrating(volume):
msg = (_("Volume %s is already part of an active migration.")
% volume['id'])
% volume.id)
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
@ -1457,8 +1465,7 @@ class API(base.Base):
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
cg_id = volume.get('consistencygroup_id', None)
if cg_id:
if volume.consistencygroup_id:
msg = _("Volume must not be part of a consistency group.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
@ -1480,16 +1487,16 @@ class API(base.Base):
vol_type_qos_id = vol_type['qos_specs_id']
old_vol_type = None
old_vol_type_id = volume['volume_type_id']
old_vol_type_id = volume.volume_type_id
old_vol_type_qos_id = None
# Error if the original and new type are the same
if volume['volume_type_id'] == vol_type_id:
if volume.volume_type_id == vol_type_id:
msg = _('New volume_type same as original: %s.') % new_type
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
if volume['volume_type_id']:
if volume.volume_type_id:
old_vol_type = volume_types.get_volume_type(
context, old_vol_type_id)
old_vol_type_qos_id = old_vol_type['qos_specs_id']
@ -1506,14 +1513,14 @@ class API(base.Base):
# We don't support changing QoS at the front-end yet for in-use volumes
# TODO(avishay): Call Nova to change QoS setting (libvirt has support
# - virDomainSetBlockIoTune() - Nova does not have support yet).
if (volume['status'] != 'available' and
if (volume.status != 'available' and
old_vol_type_qos_id != vol_type_qos_id):
for qos_id in [old_vol_type_qos_id, vol_type_qos_id]:
if qos_id:
specs = qos_specs.get_qos_specs(context.elevated(), qos_id)
if specs['consumer'] != 'back-end':
msg = _('Retype cannot change front-end qos specs for '
'in-use volume: %s.') % volume['id']
'in-use volume: %s.') % volume.id
raise exception.InvalidInput(reason=msg)
# We're checking here in so that we can report any quota issues as
@ -1523,17 +1530,17 @@ class API(base.Base):
vol_type_id)
self.update(context, volume, {'status': 'retyping',
'previous_status': volume['status']})
'previous_status': volume.status})
request_spec = {'volume_properties': volume,
'volume_id': volume['id'],
'volume_id': volume.id,
'volume_type': vol_type,
'migration_policy': migration_policy,
'quota_reservations': reservations}
self.scheduler_rpcapi.retype(context, CONF.volume_topic, volume['id'],
self.scheduler_rpcapi.retype(context, CONF.volume_topic, volume.id,
request_spec=request_spec,
filter_properties={})
filter_properties={}, volume=volume)
LOG.info(_LI("Retype volume request issued successfully."),
resource=volume)

View File

@ -190,7 +190,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.33'
RPC_API_VERSION = '1.34'
target = messaging.Target(version=RPC_API_VERSION)
@ -2039,22 +2039,28 @@ class VolumeManager(manager.SchedulerDependentManager):
resource=volume)
def retype(self, ctxt, volume_id, new_type_id, host,
migration_policy='never', reservations=None):
migration_policy='never', reservations=None, volume=None):
def _retype_error(context, volume_id, old_reservations,
def _retype_error(context, volume, old_reservations,
new_reservations, status_update):
try:
self.db.volume_update(context, volume_id, status_update)
volume.update(status_update)
volume.save()
finally:
QUOTAS.rollback(context, old_reservations)
QUOTAS.rollback(context, new_reservations)
context = ctxt.elevated()
volume_ref = self.db.volume_get(ctxt, volume_id)
status_update = {'status': volume_ref['previous_status']}
if context.project_id != volume_ref['project_id']:
project_id = volume_ref['project_id']
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
status_update = {'status': volume.previous_status}
if context.project_id != volume.project_id:
project_id = volume.project_id
else:
project_id = context.project_id
@ -2069,19 +2075,21 @@ class VolumeManager(manager.SchedulerDependentManager):
# set the volume status to error. Should that be done
# here? Setting the volume back to it's original status
# for now.
self.db.volume_update(context, volume_id, status_update)
volume.update(status_update)
volume.save()
# Get old reservations
try:
reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
reserve_opts = {'volumes': -1, 'gigabytes': -volume.size}
QUOTAS.add_volume_type_opts(context,
reserve_opts,
volume_ref.get('volume_type_id'))
volume.volume_type_id)
old_reservations = QUOTAS.reserve(context,
project_id=project_id,
**reserve_opts)
except Exception:
self.db.volume_update(context, volume_id, status_update)
volume.update(status_update)
volume.save()
LOG.exception(_LE("Failed to update usages "
"while retyping volume."))
raise exception.CinderException(_("Failed to get old volume type"
@ -2093,7 +2101,7 @@ class VolumeManager(manager.SchedulerDependentManager):
# If volume types have the same contents, no need to do anything
retyped = False
diff, all_equal = volume_types.volume_types_diff(
context, volume_ref.get('volume_type_id'), new_type_id)
context, volume.volume_type_id, new_type_id)
if all_equal:
retyped = True
@ -2113,7 +2121,7 @@ class VolumeManager(manager.SchedulerDependentManager):
try:
new_type = volume_types.get_volume_type(context, new_type_id)
ret = self.driver.retype(context,
volume_ref,
volume,
new_type,
diff,
host)
@ -2125,49 +2133,49 @@ class VolumeManager(manager.SchedulerDependentManager):
retyped = ret
if retyped:
LOG.info(_LI("Volume %s: retyped successfully"), volume_id)
LOG.info(_LI("Volume %s: retyped successfully"), volume.id)
except Exception:
retyped = False
LOG.exception(_LE("Volume %s: driver error when trying to "
"retype, falling back to generic "
"mechanism."), volume_ref['id'])
"mechanism."), volume.id)
# We could not change the type, so we need to migrate the volume, where
# the destination volume will be of the new type
if not retyped:
if migration_policy == 'never':
_retype_error(context, volume_id, old_reservations,
_retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Retype requires migration but is not allowed.")
raise exception.VolumeMigrationFailed(reason=msg)
snaps = objects.SnapshotList.get_all_for_volume(context,
volume_ref['id'])
volume.id)
if snaps:
_retype_error(context, volume_id, old_reservations,
_retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Volume must not have snapshots.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# Don't allow volume with replicas to be migrated
rep_status = volume_ref['replication_status']
rep_status = volume.replication_status
if rep_status is not None and rep_status != 'disabled':
_retype_error(context, volume_id, old_reservations,
_retype_error(context, volume, old_reservations,
new_reservations, status_update)
msg = _("Volume must not be replicated.")
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
self.db.volume_update(context, volume_ref['id'],
{'migration_status': 'starting'})
volume.migration_status = 'starting'
volume.save()
try:
self.migrate_volume(context, volume_id, host,
self.migrate_volume(context, volume.id, host,
new_type_id=new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
_retype_error(context, volume_id, old_reservations,
_retype_error(context, volume, old_reservations,
new_reservations, status_update)
else:
model_update = {'volume_type_id': new_type_id,
@ -2175,7 +2183,8 @@ class VolumeManager(manager.SchedulerDependentManager):
'status': status_update['status']}
if retype_model_update:
model_update.update(retype_model_update)
self.db.volume_update(context, volume_id, model_update)
volume.update(model_update)
volume.save()
if old_reservations:
QUOTAS.commit(context, old_reservations, project_id=project_id)
@ -2183,7 +2192,7 @@ class VolumeManager(manager.SchedulerDependentManager):
QUOTAS.commit(context, new_reservations, project_id=project_id)
self.publish_service_capabilities(context)
LOG.info(_LI("Retype volume completed successfully."),
resource=volume_ref)
resource=volume)
def manage_existing(self, ctxt, volume_id, ref=None):
try:

View File

@ -81,6 +81,7 @@ class VolumeAPI(object):
args. Forwarding CGSnapshot object instead of CGSnapshot_id.
1.32 - Adds support for sending objects over RPC in create_volume().
1.33 - Adds support for sending objects over RPC in delete_volume().
1.34 - Adds support for sending objects over RPC in retype().
"""
BASE_RPC_API_VERSION = '1.0'
@ -253,14 +254,20 @@ class VolumeAPI(object):
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.12')
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
cctxt.cast(ctxt, 'retype', volume_id=volume['id'],
new_type_id=new_type_id, host=host_p,
migration_policy=migration_policy,
reservations=reservations)
msg_args = {'volume_id': volume.id, 'new_type_id': new_type_id,
'host': host_p, 'migration_policy': migration_policy,
'reservations': reservations}
if self.client.can_send_version('1.34'):
version = '1.34'
msg_args['volume'] = volume
else:
version = '1.12'
new_host = utils.extract_host(volume.host)
cctxt = self.client.prepare(server=new_host, version=version)
cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, volume, ref):
new_host = utils.extract_host(volume['host'])