diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index 5764c1da35d..fa06628aa16 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__) class SchedulerManager(manager.Manager): """Chooses a host to create volumes.""" - RPC_API_VERSION = '1.10' + RPC_API_VERSION = '1.11' target = messaging.Target(version=RPC_API_VERSION) @@ -148,13 +148,18 @@ class SchedulerManager(manager.Manager): def migrate_volume_to_host(self, context, topic, volume_id, host, force_host_copy, request_spec, - filter_properties=None): + filter_properties=None, volume=None): """Ensure that the host exists and can accept the volume.""" self._wait_for_scheduler() + # FIXME(thangp): Remove this in v2.0 of RPC API. + if volume is None: + # For older clients, mimic the old behavior and look up the + # volume by its volume_id. + volume = objects.Volume.get_by_id(context, volume_id) + def _migrate_volume_set_error(self, context, ex, request_spec): - volume = db.volume_get(context, request_spec['volume_id']) if volume.status == 'maintenance': previous_status = ( volume.previous_status or 'maintenance') @@ -176,8 +181,7 @@ class SchedulerManager(manager.Manager): with excutils.save_and_reraise_exception(): _migrate_volume_set_error(self, context, ex, request_spec) else: - volume_ref = db.volume_get(context, volume_id) - volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref, + volume_rpcapi.VolumeAPI().migrate_volume(context, volume, tgt_host, force_host_copy) diff --git a/cinder/scheduler/rpcapi.py b/cinder/scheduler/rpcapi.py index 4e9769c72a9..e451336e7a1 100644 --- a/cinder/scheduler/rpcapi.py +++ b/cinder/scheduler/rpcapi.py @@ -44,6 +44,8 @@ class SchedulerAPI(object): 1.8 - Add sending object over RPC in create_consistencygroup method 1.9 - Adds support for sending objects over RPC in create_volume() 1.10 - Adds support for sending objects over RPC in retype() + 1.11 - Adds support for sending objects over RPC in + migrate_volume_to_host() """ RPC_API_VERSION = '1.0' @@ -95,17 +97,20 @@ class SchedulerAPI(object): def migrate_volume_to_host(self, ctxt, topic, volume_id, host, force_host_copy=False, request_spec=None, - filter_properties=None): - - cctxt = self.client.prepare(version='1.3') + filter_properties=None, volume=None): request_spec_p = jsonutils.to_primitive(request_spec) - return cctxt.cast(ctxt, 'migrate_volume_to_host', - topic=topic, - volume_id=volume_id, - host=host, - force_host_copy=force_host_copy, - request_spec=request_spec_p, - filter_properties=filter_properties) + msg_args = {'topic': topic, 'volume_id': volume_id, + 'host': host, 'force_host_copy': force_host_copy, + 'request_spec': request_spec_p, + 'filter_properties': filter_properties} + if self.client.can_send_version('1.11'): + version = '1.11' + msg_args['volume'] = volume + else: + version = '1.3' + + cctxt = self.client.prepare(version=version) + return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args) def retype(self, ctxt, topic, volume_id, request_spec=None, filter_properties=None, volume=None): diff --git a/cinder/tests/unit/api/contrib/test_admin_actions.py b/cinder/tests/unit/api/contrib/test_admin_actions.py index a6da59f1a60..abfcec05074 100644 --- a/cinder/tests/unit/api/contrib/test_admin_actions.py +++ b/cinder/tests/unit/api/contrib/test_admin_actions.py @@ -801,7 +801,7 @@ class AdminActionsTest(test.TestCase): force_host_copy=False): admin_ctx = context.get_admin_context() # build request to migrate to host - req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id']) + req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume.id) req.method = 'POST' req.headers['content-type'] = 'application/json' body = {'os-migrate_volume': {'host': host, @@ -811,7 +811,7 @@ class AdminActionsTest(test.TestCase): resp = req.get_response(app()) # verify status self.assertEqual(expected_status, resp.status_int) - volume = db.volume_get(admin_ctx, volume['id']) + volume = objects.Volume.get_by_id(admin_ctx, volume.id) return volume def test_migrate_volume_success(self): diff --git a/cinder/tests/unit/scheduler/test_rpcapi.py b/cinder/tests/unit/scheduler/test_rpcapi.py index 60c935f195a..74de0baec62 100644 --- a/cinder/tests/unit/scheduler/test_rpcapi.py +++ b/cinder/tests/unit/scheduler/test_rpcapi.py @@ -117,7 +117,9 @@ class SchedulerRpcAPITestCase(test.TestCase): version='1.2') can_send_version.assert_called_once_with('1.9') - def test_migrate_volume_to_host(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=True) + def test_migrate_volume_to_host(self, can_send_version): self._test_scheduler_api('migrate_volume_to_host', rpc_method='cast', topic='topic', @@ -126,7 +128,24 @@ class SchedulerRpcAPITestCase(test.TestCase): force_host_copy=True, request_spec='fake_request_spec', filter_properties='filter_properties', + volume='volume', + version='1.11') + can_send_version.assert_called_once_with('1.11') + + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=False) + def test_migrate_volume_to_host_old(self, can_send_version): + self._test_scheduler_api('migrate_volume_to_host', + rpc_method='cast', + topic='topic', + volume_id='volume_id', + host='host', + force_host_copy=True, + request_spec='fake_request_spec', + filter_properties='filter_properties', + volume='volume', version='1.3') + can_send_version.assert_called_once_with('1.11') @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) diff --git a/cinder/tests/unit/test_volume.py b/cinder/tests/unit/test_volume.py index e4c54bf090c..08c9010b742 100644 --- a/cinder/tests/unit/test_volume.py +++ b/cinder/tests/unit/test_volume.py @@ -4183,22 +4183,20 @@ class VolumeTestCase(BaseVolumeTestCase): def test_clean_temporary_volume(self): def fake_delete_volume(ctxt, volume): - db.volume_destroy(ctxt, volume['id']) + volume.destroy() fake_volume = tests_utils.create_volume(self.context, size=1, - host=CONF.host) + host=CONF.host, + migration_status='migrating') fake_new_volume = tests_utils.create_volume(self.context, size=1, host=CONF.host) - # Check when the migrated volume is in migration - db.volume_update(self.context, fake_volume['id'], - {'migration_status': 'migrating'}) # 1. Only clean the db - self.volume._clean_temporary_volume(self.context, fake_volume['id'], - fake_new_volume['id'], + self.volume._clean_temporary_volume(self.context, fake_volume, + fake_new_volume, clean_db_only=True) self.assertRaises(exception.VolumeNotFound, db.volume_get, self.context, - fake_new_volume['id']) + fake_new_volume.id) # 2. Delete the backend storage fake_new_volume = tests_utils.create_volume(self.context, size=1, @@ -4207,23 +4205,23 @@ class VolumeTestCase(BaseVolumeTestCase): mock_delete_volume: mock_delete_volume.side_effect = fake_delete_volume self.volume._clean_temporary_volume(self.context, - fake_volume['id'], - fake_new_volume['id'], + fake_volume, + fake_new_volume, clean_db_only=False) self.assertRaises(exception.VolumeNotFound, db.volume_get, self.context, - fake_new_volume['id']) + fake_new_volume.id) # Check when the migrated volume is not in migration fake_new_volume = tests_utils.create_volume(self.context, size=1, host=CONF.host) - db.volume_update(self.context, fake_volume['id'], - {'migration_status': 'non-migrating'}) - self.volume._clean_temporary_volume(self.context, fake_volume['id'], - fake_new_volume['id']) + fake_volume.migration_status = 'non-migrating' + fake_volume.save() + self.volume._clean_temporary_volume(self.context, fake_volume, + fake_new_volume) volume = db.volume_get(context.get_admin_context(), - fake_new_volume['id']) - self.assertIsNone(volume['migration_status']) + fake_new_volume.id) + self.assertIsNone(volume.migration_status) def test_update_volume_readonly_flag(self): """Test volume readonly flag can be updated at API level.""" @@ -4323,13 +4321,14 @@ class VolumeMigrationTestCase(VolumeTestCase): host=CONF.host, migration_status='migrating') host_obj = {'host': 'newhost', 'capabilities': {}} - self.volume.migrate_volume(self.context, volume['id'], - host_obj, False) + self.volume.migrate_volume(self.context, volume.id, host_obj, False, + volume=volume) # check volume properties - volume = db.volume_get(context.get_admin_context(), volume['id']) - self.assertEqual('newhost', volume['host']) - self.assertEqual('success', volume['migration_status']) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) + self.assertEqual('newhost', volume.host) + self.assertEqual('success', volume.migration_status) def _fake_create_volume(self, ctxt, volume, host, req_spec, filters, allow_reschedule=True): @@ -4351,12 +4350,14 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(processutils.ProcessExecutionError, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - False) - volume = db.volume_get(context.get_admin_context(), volume['id']) - self.assertEqual('error', volume['migration_status']) - self.assertEqual('available', volume['status']) + False, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) + self.assertEqual('error', volume.migration_status) + self.assertEqual('available', volume.status) @mock.patch('cinder.compute.API') @mock.patch('cinder.volume.manager.VolumeManager.' @@ -4366,7 +4367,10 @@ class VolumeMigrationTestCase(VolumeTestCase): migrate_volume_completion, nova_api): fake_volume_id = 'fake_volume_id' - fake_new_volume = {'status': 'available', 'id': fake_volume_id} + fake_db_new_volume = {'status': 'available', 'id': fake_volume_id} + fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume) + new_volume_obj = fake_volume.fake_volume_obj(self.context, + **fake_new_volume) host_obj = {'host': 'newhost', 'capabilities': {}} volume_get.return_value = fake_new_volume update_server_volume = nova_api.return_value.update_server_volume @@ -4377,12 +4381,10 @@ class VolumeMigrationTestCase(VolumeTestCase): self.volume._migrate_volume_generic(self.context, volume, host_obj, None) mock_copy_volume.assert_called_with(self.context, volume, - fake_new_volume, + new_volume_obj, remote='dest') - migrate_volume_completion.assert_called_with(self.context, - volume['id'], - fake_new_volume['id'], - error=False) + migrate_volume_completion.assert_called_with( + self.context, volume.id, new_volume_obj.id, error=False) self.assertFalse(update_server_volume.called) @mock.patch('cinder.compute.API') @@ -4421,6 +4423,7 @@ class VolumeMigrationTestCase(VolumeTestCase): rpc_delete_volume, update_migrated_volume): fake_volume = tests_utils.create_volume(self.context, size=1, + previous_status='available', host=CONF.host) host_obj = {'host': 'newhost', 'capabilities': {}} @@ -4430,12 +4433,12 @@ class VolumeMigrationTestCase(VolumeTestCase): mock.patch.object(self.volume.driver, 'delete_volume') as \ delete_volume: create_volume.side_effect = self._fake_create_volume - self.volume.migrate_volume(self.context, fake_volume['id'], - host_obj, True) - volume = db.volume_get(context.get_admin_context(), - fake_volume['id']) - self.assertEqual('newhost', volume['host']) - self.assertEqual('success', volume['migration_status']) + self.volume.migrate_volume(self.context, fake_volume.id, + host_obj, True, volume=fake_volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + fake_volume.id) + self.assertEqual('newhost', volume.host) + self.assertEqual('success', volume.migration_status) self.assertFalse(mock_migrate_volume.called) self.assertFalse(delete_volume.called) self.assertTrue(rpc_delete_volume.called) @@ -4461,12 +4464,14 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(processutils.ProcessExecutionError, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) - volume = db.volume_get(context.get_admin_context(), volume['id']) - self.assertEqual('error', volume['migration_status']) - self.assertEqual('available', volume['status']) + True, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) + self.assertEqual('error', volume.migration_status) + self.assertEqual('available', volume.status) @mock.patch('cinder.db.volume_update') def test_update_migrated_volume(self, volume_update): @@ -4474,7 +4479,8 @@ class VolumeMigrationTestCase(VolumeTestCase): fake_new_host = 'fake_new_host' fake_update = {'_name_id': 'updated_id', 'provider_location': 'updated_location'} - fake_elevated = 'fake_elevated' + fake_elevated = context.RequestContext('fake', self.project_id, + is_admin=True) volume = tests_utils.create_volume(self.context, size=1, status='available', host=fake_host) @@ -4484,13 +4490,13 @@ class VolumeMigrationTestCase(VolumeTestCase): provider_location='fake_provider_location', _name_id='fake_name_id', host=fake_new_host) - new_volume['_name_id'] = 'fake_name_id' - new_volume['provider_location'] = 'fake_provider_location' - fake_update_error = {'_name_id': new_volume['_name_id'], + new_volume._name_id = 'fake_name_id' + new_volume.provider_location = 'fake_provider_location' + fake_update_error = {'_name_id': new_volume._name_id, 'provider_location': - new_volume['provider_location']} - expected_update = {'_name_id': volume['_name_id'], - 'provider_location': volume['provider_location']} + new_volume.provider_location} + expected_update = {'_name_id': volume._name_id, + 'provider_location': volume.provider_location} with mock.patch.object(self.volume.driver, 'update_migrated_volume') as migrate_update,\ mock.patch.object(self.context, 'elevated') as elevated: @@ -4499,19 +4505,23 @@ class VolumeMigrationTestCase(VolumeTestCase): self.volume.update_migrated_volume(self.context, volume, new_volume, 'available') volume_update.assert_has_calls(( - mock.call(fake_elevated, new_volume['id'], expected_update), - mock.call(fake_elevated, volume['id'], fake_update))) + mock.call(fake_elevated, new_volume.id, expected_update), + mock.call(fake_elevated, volume.id, fake_update))) # Test the case for update_migrated_volume not implemented # for the driver. migrate_update.reset_mock() volume_update.reset_mock() + # Reset the volume objects to their original value, since they + # were changed in the last call. + new_volume._name_id = 'fake_name_id' + new_volume.provider_location = 'fake_provider_location' migrate_update.side_effect = NotImplementedError self.volume.update_migrated_volume(self.context, volume, new_volume, 'available') volume_update.assert_has_calls(( - mock.call(fake_elevated, new_volume['id'], expected_update), - mock.call(fake_elevated, volume['id'], fake_update_error))) + mock.call(fake_elevated, new_volume.id, fake_update), + mock.call(fake_elevated, volume.id, fake_update_error))) def test_migrate_volume_generic_create_volume_error(self): self.expected_status = 'error' @@ -4530,10 +4540,12 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(exception.VolumeMigrationFailed, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) - volume = db.volume_get(context.get_admin_context(), volume['id']) + True, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) self.assertTrue(clean_temporary_volume.called) @@ -4558,10 +4570,12 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(exception.VolumeMigrationFailed, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) - volume = db.volume_get(context.get_admin_context(), volume['id']) + True, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) self.assertTrue(clean_temporary_volume.called) @@ -4588,10 +4602,12 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(processutils.ProcessExecutionError, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) - volume = db.volume_get(context.get_admin_context(), volume['id']) + True, + volume=volume) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) @@ -4634,9 +4650,10 @@ class VolumeMigrationTestCase(VolumeTestCase): self.assertRaises(processutils.ProcessExecutionError, self.volume.migrate_volume, self.context, - volume['id'], + volume.id, host_obj, - True) + True, + volume=volume) volume = db.volume_get(context.get_admin_context(), volume['id']) self.assertEqual('error', volume['migration_status']) self.assertEqual('available', volume['status']) @@ -4649,7 +4666,7 @@ class VolumeMigrationTestCase(VolumeTestCase): previous_status='available'): def fake_attach_volume(ctxt, volume, instance_uuid, host_name, mountpoint, mode): - tests_utils.attach_volume(ctxt, volume['id'], + tests_utils.attach_volume(ctxt, volume.id, instance_uuid, host_name, '/dev/vda') @@ -4661,12 +4678,12 @@ class VolumeMigrationTestCase(VolumeTestCase): previous_status=previous_status) attachment_id = None if status == 'in-use': - vol = tests_utils.attach_volume(self.context, old_volume['id'], + vol = tests_utils.attach_volume(self.context, old_volume.id, instance_uuid, attached_host, '/dev/vda') self.assertEqual('in-use', vol['status']) attachment_id = vol['volume_attachment'][0]['id'] - target_status = 'target:%s' % old_volume['id'] + target_status = 'target:%s' % old_volume.id new_host = CONF.host + 'new' new_volume = tests_utils.create_volume(self.context, size=0, host=new_host, @@ -4681,16 +4698,18 @@ class VolumeMigrationTestCase(VolumeTestCase): 'update_migrated_volume'),\ mock.patch.object(self.volume.driver, 'attach_volume'): mock_attach_volume.side_effect = fake_attach_volume - self.volume.migrate_volume_completion(self.context, old_volume[ - 'id'], new_volume['id']) - after_new_volume = db.volume_get(self.context, new_volume.id) - after_old_volume = db.volume_get(self.context, old_volume.id) + self.volume.migrate_volume_completion(self.context, old_volume.id, + new_volume.id) + after_new_volume = objects.Volume.get_by_id(self.context, + new_volume.id) + after_old_volume = objects.Volume.get_by_id(self.context, + old_volume.id) if status == 'in-use': mock_detach_volume.assert_called_with(self.context, - old_volume['id'], + old_volume.id, attachment_id) attachment = db.volume_attachment_get_by_instance_uuid( - self.context, old_volume['id'], instance_uuid) + self.context, old_volume.id, instance_uuid) self.assertIsNotNone(attachment) self.assertEqual(attached_host, attachment['attached_host']) self.assertEqual(instance_uuid, attachment['instance_uuid']) @@ -4865,10 +4884,11 @@ class VolumeMigrationTestCase(VolumeTestCase): self.volume.driver._initialized = False self.assertRaises(exception.DriverNotInitialized, self.volume.migrate_volume, - self.context, volume['id'], - host_obj, True) + self.context, volume.id, host_obj, True, + volume=volume) - volume = db.volume_get(context.get_admin_context(), volume['id']) + volume = objects.Volume.get_by_id(context.get_admin_context(), + volume.id) self.assertEqual('error', volume.migration_status) # lets cleanup the mess. diff --git a/cinder/tests/unit/test_volume_rpcapi.py b/cinder/tests/unit/test_volume_rpcapi.py index ccb2f0265a6..22f0c906c4d 100644 --- a/cinder/tests/unit/test_volume_rpcapi.py +++ b/cinder/tests/unit/test_volume_rpcapi.py @@ -146,7 +146,6 @@ class VolumeRpcAPITestCase(test.TestCase): expected_msg['host'] = dest_host_dict if 'new_volume' in expected_msg: volume = expected_msg['new_volume'] - del expected_msg['new_volume'] expected_msg['new_volume_id'] = volume['id'] if 'host' in kwargs: @@ -392,7 +391,9 @@ class VolumeRpcAPITestCase(test.TestCase): version='1.14') can_send_version.assert_called_once_with('1.35') - def test_migrate_volume(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=True) + def test_migrate_volume(self, can_send_version): class FakeHost(object): def __init__(self): self.host = 'host' @@ -400,18 +401,49 @@ class VolumeRpcAPITestCase(test.TestCase): dest_host = FakeHost() self._test_volume_api('migrate_volume', rpc_method='cast', - volume=self.fake_volume, + volume=self.fake_volume_obj, + dest_host=dest_host, + force_host_copy=True, + version='1.36') + can_send_version.assert_called_once_with('1.36') + + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=False) + def test_migrate_volume_old(self, can_send_version): + class FakeHost(object): + def __init__(self): + self.host = 'host' + self.capabilities = {} + dest_host = FakeHost() + self._test_volume_api('migrate_volume', + rpc_method='cast', + volume=self.fake_volume_obj, dest_host=dest_host, force_host_copy=True, version='1.8') + can_send_version.assert_called_once_with('1.36') - def test_migrate_volume_completion(self): + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=True) + def test_migrate_volume_completion(self, can_send_version): self._test_volume_api('migrate_volume_completion', rpc_method='call', - volume=self.fake_volume, - new_volume=self.fake_volume, + volume=self.fake_volume_obj, + new_volume=self.fake_volume_obj, + error=False, + version='1.36') + can_send_version.assert_called_once_with('1.36') + + @mock.patch('oslo_messaging.RPCClient.can_send_version', + return_value=False) + def test_migrate_volume_completion_old(self, can_send_version): + self._test_volume_api('migrate_volume_completion', + rpc_method='call', + volume=self.fake_volume_obj, + new_volume=self.fake_volume_obj, error=False, version='1.10') + can_send_version.assert_called_once_with('1.36') @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index ac01271a3ea..9341399a4cf 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -1275,39 +1275,38 @@ class API(base.Base): lock_volume): """Migrate the volume to the specified host.""" - if volume['status'] not in ['available', 'in-use']: + if volume.status not in ['available', 'in-use']: msg = _('Volume %(vol_id)s status must be available or in-use, ' 'but current status is: ' - '%(vol_status)s.') % {'vol_id': volume['id'], - 'vol_status': volume['status']} + '%(vol_status)s.') % {'vol_id': volume.id, + 'vol_status': volume.status} LOG.error(msg) raise exception.InvalidVolume(reason=msg) # Make sure volume is not part of a migration. if self._is_volume_migrating(volume): msg = _("Volume %s is already part of an active " - "migration.") % volume['id'] + "migration.") % volume.id LOG.error(msg) raise exception.InvalidVolume(reason=msg) # We only handle volumes without snapshots for now - snaps = objects.SnapshotList.get_all_for_volume(context, volume['id']) + snaps = objects.SnapshotList.get_all_for_volume(context, volume.id) if snaps: - msg = _("Volume %s must not have snapshots.") % volume['id'] + msg = _("Volume %s must not have snapshots.") % volume.id LOG.error(msg) raise exception.InvalidVolume(reason=msg) # We only handle non-replicated volumes for now - rep_status = volume['replication_status'] - if rep_status is not None and rep_status != 'disabled': - msg = _("Volume %s must not be replicated.") % volume['id'] + if (volume.replication_status is not None and + volume.replication_status != 'disabled'): + msg = _("Volume %s must not be replicated.") % volume.id LOG.error(msg) raise exception.InvalidVolume(reason=msg) - cg_id = volume.get('consistencygroup_id', None) - if cg_id: + if volume.consistencygroup_id: msg = _("Volume %s must not be part of a consistency " - "group.") % volume['id'] + "group.") % volume.id LOG.error(msg) raise exception.InvalidVolume(reason=msg) @@ -1327,7 +1326,7 @@ class API(base.Base): raise exception.InvalidHost(reason=msg) # Make sure the destination host is different than the current one - if host == volume['host']: + if host == volume.host: msg = _('Destination host must be different ' 'than the current host.') LOG.error(msg) @@ -1340,27 +1339,27 @@ class API(base.Base): # that this volume is in maintenance mode, and no action is allowed # on this volume, e.g. attach, detach, retype, migrate, etc. updates = {'migration_status': 'starting', - 'previous_status': volume['status']} - if lock_volume and volume['status'] == 'available': + 'previous_status': volume.status} + if lock_volume and volume.status == 'available': updates['status'] = 'maintenance' self.update(context, volume, updates) # Call the scheduler to ensure that the host exists and that it can # accept the volume volume_type = {} - volume_type_id = volume['volume_type_id'] - if volume_type_id: + if volume.volume_type_id: volume_type = volume_types.get_volume_type(context.elevated(), - volume_type_id) + volume.volume_type_id) request_spec = {'volume_properties': volume, 'volume_type': volume_type, - 'volume_id': volume['id']} + 'volume_id': volume.id} self.scheduler_rpcapi.migrate_volume_to_host(context, CONF.volume_topic, - volume['id'], + volume.id, host, force_host_copy, - request_spec) + request_spec, + volume=volume) LOG.info(_LI("Migrate volume request issued successfully."), resource=volume) @@ -1368,34 +1367,34 @@ class API(base.Base): def migrate_volume_completion(self, context, volume, new_volume, error): # This is a volume swap initiated by Nova, not Cinder. Nova expects # us to return the new_volume_id. - if not (volume['migration_status'] or new_volume['migration_status']): + if not (volume.migration_status or new_volume.migration_status): # Don't need to do migration, but still need to finish the # volume attach and detach so volumes don't end in 'attaching' # and 'detaching' state - attachments = volume['volume_attachment'] + attachments = volume.volume_attachment for attachment in attachments: - self.detach(context, volume, attachment['id']) + self.detach(context, volume, attachment.id) self.attach(context, new_volume, - attachment['instance_uuid'], - attachment['attached_host'], - attachment['mountpoint'], + attachment.instance_uuid, + attachment.attached_host, + attachment.mountpoint, 'rw') - return new_volume['id'] + return new_volume.id - if not volume['migration_status']: + if not volume.migration_status: msg = _('Source volume not mid-migration.') raise exception.InvalidVolume(reason=msg) - if not new_volume['migration_status']: + if not new_volume.migration_status: msg = _('Destination volume not mid-migration.') raise exception.InvalidVolume(reason=msg) - expected_status = 'target:%s' % volume['id'] - if not new_volume['migration_status'] == expected_status: + expected_status = 'target:%s' % volume.id + if not new_volume.migration_status == expected_status: msg = (_('Destination has migration_status %(stat)s, expected ' - '%(exp)s.') % {'stat': new_volume['migration_status'], + '%(exp)s.') % {'stat': new_volume.migration_status, 'exp': expected_status}) raise exception.InvalidVolume(reason=msg) diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 86908de0f70..d63e2360c89 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -197,7 +197,7 @@ def locked_snapshot_operation(f): class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" - RPC_API_VERSION = '1.35' + RPC_API_VERSION = '1.36' target = messaging.Target(version=RPC_API_VERSION) @@ -1626,35 +1626,38 @@ class VolumeManager(manager.SchedulerDependentManager): # Wait for new_volume to become ready starttime = time.time() deadline = starttime + CONF.migration_create_volume_timeout_secs - new_volume = self.db.volume_get(ctxt, new_volume['id']) + # TODO(thangp): Replace get_by_id with refresh when it is available + new_volume = objects.Volume.get_by_id(ctxt, new_volume.id) tries = 0 - while new_volume['status'] != 'available': + while new_volume.status != 'available': tries += 1 now = time.time() - if new_volume['status'] == 'error': + if new_volume.status == 'error': msg = _("failed to create new_volume on destination host") - self._clean_temporary_volume(ctxt, volume['id'], - new_volume['id'], + self._clean_temporary_volume(ctxt, volume, + new_volume, clean_db_only=True) raise exception.VolumeMigrationFailed(reason=msg) elif now > deadline: msg = _("timeout creating new_volume on destination host") - self._clean_temporary_volume(ctxt, volume['id'], - new_volume['id'], + self._clean_temporary_volume(ctxt, volume, + new_volume, clean_db_only=True) raise exception.VolumeMigrationFailed(reason=msg) else: time.sleep(tries ** 2) - new_volume = self.db.volume_get(ctxt, new_volume['id']) + # TODO(thangp): Replace get_by_id with refresh when it is + # available + new_volume = objects.Volume.get_by_id(ctxt, new_volume.id) # Copy the source volume to the destination volume try: - attachments = volume['volume_attachment'] + attachments = volume.volume_attachment if not attachments: self._copy_volume_data(ctxt, volume, new_volume, remote='dest') # The above call is synchronous so we complete the migration - self.migrate_volume_completion(ctxt, volume['id'], - new_volume['id'], + self.migrate_volume_completion(ctxt, volume.id, + new_volume.id, error=False) else: nova_api = compute.API() @@ -1663,58 +1666,63 @@ class VolumeManager(manager.SchedulerDependentManager): for attachment in attachments: instance_uuid = attachment['instance_uuid'] nova_api.update_server_volume(ctxt, instance_uuid, - volume['id'], - new_volume['id']) + volume.id, + new_volume.id) except Exception: with excutils.save_and_reraise_exception(): LOG.error(_LE("Failed to copy volume %(vol1)s to %(vol2)s"), - {'vol1': volume['id'], 'vol2': new_volume['id']}) - self._clean_temporary_volume(ctxt, volume['id'], - new_volume['id']) + {'vol1': volume.id, 'vol2': new_volume.id}) + self._clean_temporary_volume(ctxt, volume, + new_volume) - def _clean_temporary_volume(self, ctxt, volume_id, new_volume_id, + def _clean_temporary_volume(self, ctxt, volume, new_volume, clean_db_only=False): - volume = self.db.volume_get(ctxt, volume_id) # If we're in the migrating phase, we need to cleanup # destination volume because source volume is remaining - if volume['migration_status'] == 'migrating': + if volume.migration_status == 'migrating': try: if clean_db_only: # The temporary volume is not created, only DB data # is created - self.db.volume_destroy(ctxt, new_volume_id) + new_volume.destroy() else: # The temporary volume is already created rpcapi = volume_rpcapi.VolumeAPI() - volume = self.db.volume_get(ctxt, new_volume_id) - rpcapi.delete_volume(ctxt, volume) + rpcapi.delete_volume(ctxt, new_volume) except exception.VolumeNotFound: LOG.info(_LI("Couldn't find the temporary volume " "%(vol)s in the database. There is no need " "to clean up this volume."), - {'vol': new_volume_id}) + {'vol': new_volume.id}) else: # If we're in the completing phase don't delete the # destination because we may have already deleted the # source! But the migration_status in database should # be cleared to handle volume after migration failure try: - updates = {'migration_status': None} - self.db.volume_update(ctxt, new_volume_id, updates) + new_volume.migration_status = None + new_volume.save() except exception.VolumeNotFound: LOG.info(_LI("Couldn't find destination volume " "%(vol)s in the database. The entry might be " "successfully deleted during migration " "completion phase."), - {'vol': new_volume_id}) + {'vol': new_volume.id}) LOG.warning(_LW("Failed to migrate volume. The destination " "volume %(vol)s is not deleted since the " "source volume may have been deleted."), - {'vol': new_volume_id}) + {'vol': new_volume.id}) def migrate_volume_completion(self, ctxt, volume_id, new_volume_id, - error=False): + error=False, volume=None, new_volume=None): + # FIXME(thangp): Remove this in v2.0 of RPC API. + if volume is None or new_volume is None: + # For older clients, mimic the old behavior and look up the volume + # by its volume_id. + volume = objects.Volume.get_by_id(ctxt, volume_id) + new_volume = objects.Volume.get_by_id(ctxt, new_volume_id) + try: # NOTE(flaper87): Verify the driver is enabled # before going forward. The exception will be caught @@ -1722,37 +1730,36 @@ class VolumeManager(manager.SchedulerDependentManager): utils.require_driver_initialized(self.driver) except exception.DriverNotInitialized: with excutils.save_and_reraise_exception(): - self.db.volume_update(ctxt, volume_id, - {'migration_status': 'error'}) + volume.migration_status = 'error' + volume.save() LOG.debug("migrate_volume_completion: completing migration for " "volume %(vol1)s (temporary volume %(vol2)s", - {'vol1': volume_id, 'vol2': new_volume_id}) - volume = self.db.volume_get(ctxt, volume_id) - new_volume = self.db.volume_get(ctxt, new_volume_id) + {'vol1': volume.id, 'vol2': new_volume.id}) rpcapi = volume_rpcapi.VolumeAPI() - orig_volume_status = volume['previous_status'] + orig_volume_status = volume.previous_status if error: LOG.info(_LI("migrate_volume_completion is cleaning up an error " "for volume %(vol1)s (temporary volume %(vol2)s"), - {'vol1': volume['id'], 'vol2': new_volume['id']}) + {'vol1': volume['id'], 'vol2': new_volume.id}) rpcapi.delete_volume(ctxt, new_volume) updates = {'migration_status': 'error', 'status': orig_volume_status} - self.db.volume_update(ctxt, volume_id, updates) - return volume_id + volume.update(updates) + volume.save() + return volume.id - self.db.volume_update(ctxt, volume_id, - {'migration_status': 'completing'}) + volume.migration_status = 'completing' + volume.save() # Detach the source volume (if it fails, don't fail the migration) try: if orig_volume_status == 'in-use': - attachments = volume['volume_attachment'] + attachments = volume.volume_attachment for attachment in attachments: - self.detach_volume(ctxt, volume_id, attachment['id']) + self.detach_volume(ctxt, volume.id, attachment['id']) except Exception as ex: LOG.error(_LE("Detach migration source volume failed: %(err)s"), {'err': ex}, resource=volume) @@ -1767,20 +1774,21 @@ class VolumeManager(manager.SchedulerDependentManager): # Swap src and dest DB records so we can continue using the src id and # asynchronously delete the destination id __, updated_new = self.db.finish_volume_migration( - ctxt, volume_id, new_volume_id) + ctxt, volume.id, new_volume.id) updates = {'status': orig_volume_status, - 'previous_status': volume['status'], + 'previous_status': volume.status, 'migration_status': 'success'} if orig_volume_status == 'in-use': - attachments = volume['volume_attachment'] + attachments = volume.volume_attachment for attachment in attachments: rpcapi.attach_volume(ctxt, volume, attachment['instance_uuid'], attachment['attached_host'], attachment['mountpoint'], 'rw') - self.db.volume_update(ctxt, volume_id, updates) + volume.update(updates) + volume.save() # Asynchronous deletion of the source volume in the back-end (now # pointed by the target volume id) @@ -1789,15 +1797,21 @@ class VolumeManager(manager.SchedulerDependentManager): except Exception as ex: LOG.error(_LE('Failed to request async delete of migration source ' 'vol %(vol)s: %(err)s'), - {'vol': volume_id, 'err': ex}) + {'vol': volume.id, 'err': ex}) LOG.info(_LI("Complete-Migrate volume completed successfully."), resource=volume) - return volume['id'] + return volume.id def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False, - new_type_id=None): + new_type_id=None, volume=None): """Migrate the volume to the specified host (called on source host).""" + # FIXME(thangp): Remove this in v2.0 of RPC API. + if volume is None: + # For older clients, mimic the old behavior and look up the volume + # by its volume_id. + volume = objects.Volume.get_by_id(context, volume_id) + try: # NOTE(flaper87): Verify the driver is enabled # before going forward. The exception will be caught @@ -1805,54 +1819,54 @@ class VolumeManager(manager.SchedulerDependentManager): utils.require_driver_initialized(self.driver) except exception.DriverNotInitialized: with excutils.save_and_reraise_exception(): - self.db.volume_update(ctxt, volume_id, - {'migration_status': 'error'}) + volume.migration_status = 'error' + volume.save() - volume_ref = self.db.volume_get(ctxt, volume_id) model_update = None moved = False status_update = None - if volume_ref['status'] in ('retyping', 'maintenance'): - status_update = {'status': volume_ref['previous_status']} + if volume.status in ('retyping', 'maintenance'): + status_update = {'status': volume.previous_status} - self.db.volume_update(ctxt, volume_ref['id'], - {'migration_status': 'migrating'}) + volume.migration_status = 'migrating' + volume.save() if not force_host_copy and new_type_id is None: try: - LOG.debug("Issue driver.migrate_volume.", resource=volume_ref) + LOG.debug("Issue driver.migrate_volume.", resource=volume) moved, model_update = self.driver.migrate_volume(ctxt, - volume_ref, + volume, host) if moved: updates = {'host': host['host'], 'migration_status': 'success', - 'previous_status': volume_ref['status']} + 'previous_status': volume.status} if status_update: updates.update(status_update) if model_update: updates.update(model_update) - volume_ref = self.db.volume_update(ctxt, - volume_ref['id'], - updates) + volume.update(updates) + volume.save() except Exception: with excutils.save_and_reraise_exception(): updates = {'migration_status': 'error'} if status_update: updates.update(status_update) - self.db.volume_update(ctxt, volume_ref['id'], updates) + volume.update(updates) + volume.save() if not moved: try: - self._migrate_volume_generic(ctxt, volume_ref, host, + self._migrate_volume_generic(ctxt, volume, host, new_type_id) except Exception: with excutils.save_and_reraise_exception(): updates = {'migration_status': 'error'} if status_update: updates.update(status_update) - self.db.volume_update(ctxt, volume_ref['id'], updates) + volume.update(updates) + volume.save() LOG.info(_LI("Migrate volume completed successfully."), - resource=volume_ref) + resource=volume) @periodic_task.periodic_task def _report_driver_status(self, context): @@ -3088,14 +3102,16 @@ class VolumeManager(manager.SchedulerDependentManager): def update_migrated_volume(self, ctxt, volume, new_volume, volume_status): """Finalize migration process on backend device.""" + # FIXME(thangp): Remove this in v2.0 of RPC API. + if (not isinstance(volume, objects.Volume) or + not isinstance(new_volume, objects.Volume)): + volume = objects.Volume.get_by_id(ctxt, volume['id']) + new_volume = objects.Volume.get_by_id(ctxt, new_volume['id']) + model_update = None - # This is temporary fix for bug 1491210. - volume = self.db.volume_get(ctxt, volume['id']) - new_volume = self.db.volume_get(ctxt, new_volume['id']) - model_update_default = {'_name_id': new_volume['_name_id'] or - new_volume['id'], + model_update_default = {'_name_id': new_volume.name_id, 'provider_location': - new_volume['provider_location']} + new_volume.provider_location} try: model_update = self.driver.update_migrated_volume(ctxt, volume, @@ -3119,17 +3135,19 @@ class VolumeManager(manager.SchedulerDependentManager): if volume.get('volume_metadata'): model_update_new[key] = { metadata['key']: metadata['value'] - for metadata in volume.get('volume_metadata')} + for metadata in volume.volume_metadata} elif key == 'admin_metadata': model_update_new[key] = { metadata['key']: metadata['value'] - for metadata in volume.get('volume_admin_metadata')} + for metadata in volume.volume_admin_metadata} else: model_update_new[key] = volume[key] - self.db.volume_update(ctxt.elevated(), new_volume['id'], - model_update_new) - self.db.volume_update(ctxt.elevated(), volume['id'], - model_update_default) + with new_volume.obj_as_admin(): + new_volume.update(model_update_new) + new_volume.save() + with volume.obj_as_admin(): + volume.update(model_update_default) + volume.save() # Replication V2 methods def enable_replication(self, context, volume): diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index c81e78341be..035b4d59556 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -83,6 +83,8 @@ class VolumeAPI(object): 1.33 - Adds support for sending objects over RPC in delete_volume(). 1.34 - Adds support for sending objects over RPC in retype(). 1.35 - Adds support for sending objects over RPC in extend_volume(). + 1.36 - Adds support for sending objects over RPC in migrate_volume(), + migrate_volume_completion(), and update_migrated_volume(). """ BASE_RPC_API_VERSION = '1.0' @@ -246,20 +248,35 @@ class VolumeAPI(object): cctxt.cast(ctxt, 'extend_volume', **msg_args) def migrate_volume(self, ctxt, volume, dest_host, force_host_copy): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.8') + new_host = utils.extract_host(volume.host) host_p = {'host': dest_host.host, 'capabilities': dest_host.capabilities} - cctxt.cast(ctxt, 'migrate_volume', volume_id=volume['id'], - host=host_p, force_host_copy=force_host_copy) + + msg_args = {'volume_id': volume.id, 'host': host_p, + 'force_host_copy': force_host_copy} + if self.client.can_send_version('1.36'): + version = '1.36' + msg_args['volume'] = volume + else: + version = '1.8' + + cctxt = self.client.prepare(server=new_host, version=version) + cctxt.cast(ctxt, 'migrate_volume', **msg_args) def migrate_volume_completion(self, ctxt, volume, new_volume, error): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.10') - return cctxt.call(ctxt, 'migrate_volume_completion', - volume_id=volume['id'], - new_volume_id=new_volume['id'], - error=error) + new_host = utils.extract_host(volume.host) + + msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id, + 'error': error} + if self.client.can_send_version('1.36'): + version = '1.36' + msg_args['volume'] = volume + msg_args['new_volume'] = new_volume + else: + version = '1.10' + + cctxt = self.client.prepare(server=new_host, version=version) + return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args) def retype(self, ctxt, volume, new_type_id, dest_host, migration_policy='never', reservations=None): @@ -296,7 +313,7 @@ class VolumeAPI(object): def update_migrated_volume(self, ctxt, volume, new_volume, original_volume_status): host = utils.extract_host(new_volume['host']) - cctxt = self.client.prepare(server=host, version='1.19') + cctxt = self.client.prepare(server=host, version='1.36') cctxt.call(ctxt, 'update_migrated_volume', volume=volume,