Merge "Update migrate_volume API to use versionedobjects"

This commit is contained in:
Jenkins
2015-12-09 05:52:03 +00:00
committed by Gerrit Code Review
9 changed files with 338 additions and 224 deletions

View File

@@ -56,7 +56,7 @@ LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes."""
RPC_API_VERSION = '1.10'
RPC_API_VERSION = '1.11'
target = messaging.Target(version=RPC_API_VERSION)
@@ -148,13 +148,18 @@ class SchedulerManager(manager.Manager):
def migrate_volume_to_host(self, context, topic, volume_id, host,
force_host_copy, request_spec,
filter_properties=None):
filter_properties=None, volume=None):
"""Ensure that the host exists and can accept the volume."""
self._wait_for_scheduler()
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the
# volume by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
def _migrate_volume_set_error(self, context, ex, request_spec):
volume = db.volume_get(context, request_spec['volume_id'])
if volume.status == 'maintenance':
previous_status = (
volume.previous_status or 'maintenance')
@@ -176,8 +181,7 @@ class SchedulerManager(manager.Manager):
with excutils.save_and_reraise_exception():
_migrate_volume_set_error(self, context, ex, request_spec)
else:
volume_ref = db.volume_get(context, volume_id)
volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
volume_rpcapi.VolumeAPI().migrate_volume(context, volume,
tgt_host,
force_host_copy)

View File

@@ -44,6 +44,8 @@ class SchedulerAPI(object):
1.8 - Add sending object over RPC in create_consistencygroup method
1.9 - Adds support for sending objects over RPC in create_volume()
1.10 - Adds support for sending objects over RPC in retype()
1.11 - Adds support for sending objects over RPC in
migrate_volume_to_host()
"""
RPC_API_VERSION = '1.0'
@@ -95,17 +97,20 @@ class SchedulerAPI(object):
def migrate_volume_to_host(self, ctxt, topic, volume_id, host,
force_host_copy=False, request_spec=None,
filter_properties=None):
cctxt = self.client.prepare(version='1.3')
filter_properties=None, volume=None):
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'migrate_volume_to_host',
topic=topic,
volume_id=volume_id,
host=host,
force_host_copy=force_host_copy,
request_spec=request_spec_p,
filter_properties=filter_properties)
msg_args = {'topic': topic, 'volume_id': volume_id,
'host': host, 'force_host_copy': force_host_copy,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
if self.client.can_send_version('1.11'):
version = '1.11'
msg_args['volume'] = volume
else:
version = '1.3'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
def retype(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None, volume=None):

View File

@@ -801,7 +801,7 @@ class AdminActionsTest(test.TestCase):
force_host_copy=False):
admin_ctx = context.get_admin_context()
# build request to migrate to host
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume.id)
req.method = 'POST'
req.headers['content-type'] = 'application/json'
body = {'os-migrate_volume': {'host': host,
@@ -811,7 +811,7 @@ class AdminActionsTest(test.TestCase):
resp = req.get_response(app())
# verify status
self.assertEqual(expected_status, resp.status_int)
volume = db.volume_get(admin_ctx, volume['id'])
volume = objects.Volume.get_by_id(admin_ctx, volume.id)
return volume
def test_migrate_volume_success(self):

View File

@@ -117,7 +117,9 @@ class SchedulerRpcAPITestCase(test.TestCase):
version='1.2')
can_send_version.assert_called_once_with('1.9')
def test_migrate_volume_to_host(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_migrate_volume_to_host(self, can_send_version):
self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast',
topic='topic',
@@ -126,7 +128,24 @@ class SchedulerRpcAPITestCase(test.TestCase):
force_host_copy=True,
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.11')
can_send_version.assert_called_once_with('1.11')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_migrate_volume_to_host_old(self, can_send_version):
self._test_scheduler_api('migrate_volume_to_host',
rpc_method='cast',
topic='topic',
volume_id='volume_id',
host='host',
force_host_copy=True,
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
version='1.3')
can_send_version.assert_called_once_with('1.11')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)

View File

@@ -4183,22 +4183,20 @@ class VolumeTestCase(BaseVolumeTestCase):
def test_clean_temporary_volume(self):
def fake_delete_volume(ctxt, volume):
db.volume_destroy(ctxt, volume['id'])
volume.destroy()
fake_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
host=CONF.host,
migration_status='migrating')
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
# Check when the migrated volume is in migration
db.volume_update(self.context, fake_volume['id'],
{'migration_status': 'migrating'})
# 1. Only clean the db
self.volume._clean_temporary_volume(self.context, fake_volume['id'],
fake_new_volume['id'],
self.volume._clean_temporary_volume(self.context, fake_volume,
fake_new_volume,
clean_db_only=True)
self.assertRaises(exception.VolumeNotFound,
db.volume_get, self.context,
fake_new_volume['id'])
fake_new_volume.id)
# 2. Delete the backend storage
fake_new_volume = tests_utils.create_volume(self.context, size=1,
@@ -4207,23 +4205,23 @@ class VolumeTestCase(BaseVolumeTestCase):
mock_delete_volume:
mock_delete_volume.side_effect = fake_delete_volume
self.volume._clean_temporary_volume(self.context,
fake_volume['id'],
fake_new_volume['id'],
fake_volume,
fake_new_volume,
clean_db_only=False)
self.assertRaises(exception.VolumeNotFound,
db.volume_get, self.context,
fake_new_volume['id'])
fake_new_volume.id)
# Check when the migrated volume is not in migration
fake_new_volume = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
db.volume_update(self.context, fake_volume['id'],
{'migration_status': 'non-migrating'})
self.volume._clean_temporary_volume(self.context, fake_volume['id'],
fake_new_volume['id'])
fake_volume.migration_status = 'non-migrating'
fake_volume.save()
self.volume._clean_temporary_volume(self.context, fake_volume,
fake_new_volume)
volume = db.volume_get(context.get_admin_context(),
fake_new_volume['id'])
self.assertIsNone(volume['migration_status'])
fake_new_volume.id)
self.assertIsNone(volume.migration_status)
def test_update_volume_readonly_flag(self):
"""Test volume readonly flag can be updated at API level."""
@@ -4323,13 +4321,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
host=CONF.host,
migration_status='migrating')
host_obj = {'host': 'newhost', 'capabilities': {}}
self.volume.migrate_volume(self.context, volume['id'],
host_obj, False)
self.volume.migrate_volume(self.context, volume.id, host_obj, False,
volume=volume)
# check volume properties
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('newhost', volume['host'])
self.assertEqual('success', volume['migration_status'])
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('newhost', volume.host)
self.assertEqual('success', volume.migration_status)
def _fake_create_volume(self, ctxt, volume, host, req_spec, filters,
allow_reschedule=True):
@@ -4351,12 +4350,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
False)
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
False,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume.migration_status)
self.assertEqual('available', volume.status)
@mock.patch('cinder.compute.API')
@mock.patch('cinder.volume.manager.VolumeManager.'
@@ -4366,7 +4367,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
migrate_volume_completion,
nova_api):
fake_volume_id = 'fake_volume_id'
fake_new_volume = {'status': 'available', 'id': fake_volume_id}
fake_db_new_volume = {'status': 'available', 'id': fake_volume_id}
fake_new_volume = fake_volume.fake_db_volume(**fake_db_new_volume)
new_volume_obj = fake_volume.fake_volume_obj(self.context,
**fake_new_volume)
host_obj = {'host': 'newhost', 'capabilities': {}}
volume_get.return_value = fake_new_volume
update_server_volume = nova_api.return_value.update_server_volume
@@ -4377,12 +4381,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.volume._migrate_volume_generic(self.context, volume,
host_obj, None)
mock_copy_volume.assert_called_with(self.context, volume,
fake_new_volume,
new_volume_obj,
remote='dest')
migrate_volume_completion.assert_called_with(self.context,
volume['id'],
fake_new_volume['id'],
error=False)
migrate_volume_completion.assert_called_with(
self.context, volume.id, new_volume_obj.id, error=False)
self.assertFalse(update_server_volume.called)
@mock.patch('cinder.compute.API')
@@ -4421,6 +4423,7 @@ class VolumeMigrationTestCase(VolumeTestCase):
rpc_delete_volume,
update_migrated_volume):
fake_volume = tests_utils.create_volume(self.context, size=1,
previous_status='available',
host=CONF.host)
host_obj = {'host': 'newhost', 'capabilities': {}}
@@ -4430,12 +4433,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
mock.patch.object(self.volume.driver, 'delete_volume') as \
delete_volume:
create_volume.side_effect = self._fake_create_volume
self.volume.migrate_volume(self.context, fake_volume['id'],
host_obj, True)
volume = db.volume_get(context.get_admin_context(),
fake_volume['id'])
self.assertEqual('newhost', volume['host'])
self.assertEqual('success', volume['migration_status'])
self.volume.migrate_volume(self.context, fake_volume.id,
host_obj, True, volume=fake_volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
fake_volume.id)
self.assertEqual('newhost', volume.host)
self.assertEqual('success', volume.migration_status)
self.assertFalse(mock_migrate_volume.called)
self.assertFalse(delete_volume.called)
self.assertTrue(rpc_delete_volume.called)
@@ -4461,12 +4464,14 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
True,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume.migration_status)
self.assertEqual('available', volume.status)
@mock.patch('cinder.db.volume_update')
def test_update_migrated_volume(self, volume_update):
@@ -4474,7 +4479,8 @@ class VolumeMigrationTestCase(VolumeTestCase):
fake_new_host = 'fake_new_host'
fake_update = {'_name_id': 'updated_id',
'provider_location': 'updated_location'}
fake_elevated = 'fake_elevated'
fake_elevated = context.RequestContext('fake', self.project_id,
is_admin=True)
volume = tests_utils.create_volume(self.context, size=1,
status='available',
host=fake_host)
@@ -4484,13 +4490,13 @@ class VolumeMigrationTestCase(VolumeTestCase):
provider_location='fake_provider_location',
_name_id='fake_name_id',
host=fake_new_host)
new_volume['_name_id'] = 'fake_name_id'
new_volume['provider_location'] = 'fake_provider_location'
fake_update_error = {'_name_id': new_volume['_name_id'],
new_volume._name_id = 'fake_name_id'
new_volume.provider_location = 'fake_provider_location'
fake_update_error = {'_name_id': new_volume._name_id,
'provider_location':
new_volume['provider_location']}
expected_update = {'_name_id': volume['_name_id'],
'provider_location': volume['provider_location']}
new_volume.provider_location}
expected_update = {'_name_id': volume._name_id,
'provider_location': volume.provider_location}
with mock.patch.object(self.volume.driver,
'update_migrated_volume') as migrate_update,\
mock.patch.object(self.context, 'elevated') as elevated:
@@ -4499,19 +4505,23 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.volume.update_migrated_volume(self.context, volume,
new_volume, 'available')
volume_update.assert_has_calls((
mock.call(fake_elevated, new_volume['id'], expected_update),
mock.call(fake_elevated, volume['id'], fake_update)))
mock.call(fake_elevated, new_volume.id, expected_update),
mock.call(fake_elevated, volume.id, fake_update)))
# Test the case for update_migrated_volume not implemented
# for the driver.
migrate_update.reset_mock()
volume_update.reset_mock()
# Reset the volume objects to their original value, since they
# were changed in the last call.
new_volume._name_id = 'fake_name_id'
new_volume.provider_location = 'fake_provider_location'
migrate_update.side_effect = NotImplementedError
self.volume.update_migrated_volume(self.context, volume,
new_volume, 'available')
volume_update.assert_has_calls((
mock.call(fake_elevated, new_volume['id'], expected_update),
mock.call(fake_elevated, volume['id'], fake_update_error)))
mock.call(fake_elevated, new_volume.id, fake_update),
mock.call(fake_elevated, volume.id, fake_update_error)))
def test_migrate_volume_generic_create_volume_error(self):
self.expected_status = 'error'
@@ -4530,10 +4540,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(exception.VolumeMigrationFailed,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
True,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
self.assertTrue(clean_temporary_volume.called)
@@ -4558,10 +4570,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(exception.VolumeMigrationFailed,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
True,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
self.assertTrue(clean_temporary_volume.called)
@@ -4588,10 +4602,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
volume = db.volume_get(context.get_admin_context(), volume['id'])
True,
volume=volume)
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
@@ -4634,9 +4650,10 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.assertRaises(processutils.ProcessExecutionError,
self.volume.migrate_volume,
self.context,
volume['id'],
volume.id,
host_obj,
True)
True,
volume=volume)
volume = db.volume_get(context.get_admin_context(), volume['id'])
self.assertEqual('error', volume['migration_status'])
self.assertEqual('available', volume['status'])
@@ -4649,7 +4666,7 @@ class VolumeMigrationTestCase(VolumeTestCase):
previous_status='available'):
def fake_attach_volume(ctxt, volume, instance_uuid, host_name,
mountpoint, mode):
tests_utils.attach_volume(ctxt, volume['id'],
tests_utils.attach_volume(ctxt, volume.id,
instance_uuid, host_name,
'/dev/vda')
@@ -4661,12 +4678,12 @@ class VolumeMigrationTestCase(VolumeTestCase):
previous_status=previous_status)
attachment_id = None
if status == 'in-use':
vol = tests_utils.attach_volume(self.context, old_volume['id'],
vol = tests_utils.attach_volume(self.context, old_volume.id,
instance_uuid, attached_host,
'/dev/vda')
self.assertEqual('in-use', vol['status'])
attachment_id = vol['volume_attachment'][0]['id']
target_status = 'target:%s' % old_volume['id']
target_status = 'target:%s' % old_volume.id
new_host = CONF.host + 'new'
new_volume = tests_utils.create_volume(self.context, size=0,
host=new_host,
@@ -4681,16 +4698,18 @@ class VolumeMigrationTestCase(VolumeTestCase):
'update_migrated_volume'),\
mock.patch.object(self.volume.driver, 'attach_volume'):
mock_attach_volume.side_effect = fake_attach_volume
self.volume.migrate_volume_completion(self.context, old_volume[
'id'], new_volume['id'])
after_new_volume = db.volume_get(self.context, new_volume.id)
after_old_volume = db.volume_get(self.context, old_volume.id)
self.volume.migrate_volume_completion(self.context, old_volume.id,
new_volume.id)
after_new_volume = objects.Volume.get_by_id(self.context,
new_volume.id)
after_old_volume = objects.Volume.get_by_id(self.context,
old_volume.id)
if status == 'in-use':
mock_detach_volume.assert_called_with(self.context,
old_volume['id'],
old_volume.id,
attachment_id)
attachment = db.volume_attachment_get_by_instance_uuid(
self.context, old_volume['id'], instance_uuid)
self.context, old_volume.id, instance_uuid)
self.assertIsNotNone(attachment)
self.assertEqual(attached_host, attachment['attached_host'])
self.assertEqual(instance_uuid, attachment['instance_uuid'])
@@ -4865,10 +4884,11 @@ class VolumeMigrationTestCase(VolumeTestCase):
self.volume.driver._initialized = False
self.assertRaises(exception.DriverNotInitialized,
self.volume.migrate_volume,
self.context, volume['id'],
host_obj, True)
self.context, volume.id, host_obj, True,
volume=volume)
volume = db.volume_get(context.get_admin_context(), volume['id'])
volume = objects.Volume.get_by_id(context.get_admin_context(),
volume.id)
self.assertEqual('error', volume.migration_status)
# lets cleanup the mess.

View File

@@ -146,7 +146,6 @@ class VolumeRpcAPITestCase(test.TestCase):
expected_msg['host'] = dest_host_dict
if 'new_volume' in expected_msg:
volume = expected_msg['new_volume']
del expected_msg['new_volume']
expected_msg['new_volume_id'] = volume['id']
if 'host' in kwargs:
@@ -392,7 +391,9 @@ class VolumeRpcAPITestCase(test.TestCase):
version='1.14')
can_send_version.assert_called_once_with('1.35')
def test_migrate_volume(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_migrate_volume(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
@@ -400,18 +401,49 @@ class VolumeRpcAPITestCase(test.TestCase):
dest_host = FakeHost()
self._test_volume_api('migrate_volume',
rpc_method='cast',
volume=self.fake_volume,
volume=self.fake_volume_obj,
dest_host=dest_host,
force_host_copy=True,
version='1.36')
can_send_version.assert_called_once_with('1.36')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_migrate_volume_old(self, can_send_version):
class FakeHost(object):
def __init__(self):
self.host = 'host'
self.capabilities = {}
dest_host = FakeHost()
self._test_volume_api('migrate_volume',
rpc_method='cast',
volume=self.fake_volume_obj,
dest_host=dest_host,
force_host_copy=True,
version='1.8')
can_send_version.assert_called_once_with('1.36')
def test_migrate_volume_completion(self):
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
def test_migrate_volume_completion(self, can_send_version):
self._test_volume_api('migrate_volume_completion',
rpc_method='call',
volume=self.fake_volume,
new_volume=self.fake_volume,
volume=self.fake_volume_obj,
new_volume=self.fake_volume_obj,
error=False,
version='1.36')
can_send_version.assert_called_once_with('1.36')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
def test_migrate_volume_completion_old(self, can_send_version):
self._test_volume_api('migrate_volume_completion',
rpc_method='call',
volume=self.fake_volume_obj,
new_volume=self.fake_volume_obj,
error=False,
version='1.10')
can_send_version.assert_called_once_with('1.36')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)

View File

@@ -1275,39 +1275,38 @@ class API(base.Base):
lock_volume):
"""Migrate the volume to the specified host."""
if volume['status'] not in ['available', 'in-use']:
if volume.status not in ['available', 'in-use']:
msg = _('Volume %(vol_id)s status must be available or in-use, '
'but current status is: '
'%(vol_status)s.') % {'vol_id': volume['id'],
'vol_status': volume['status']}
'%(vol_status)s.') % {'vol_id': volume.id,
'vol_status': volume.status}
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# Make sure volume is not part of a migration.
if self._is_volume_migrating(volume):
msg = _("Volume %s is already part of an active "
"migration.") % volume['id']
"migration.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# We only handle volumes without snapshots for now
snaps = objects.SnapshotList.get_all_for_volume(context, volume['id'])
snaps = objects.SnapshotList.get_all_for_volume(context, volume.id)
if snaps:
msg = _("Volume %s must not have snapshots.") % volume['id']
msg = _("Volume %s must not have snapshots.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
# We only handle non-replicated volumes for now
rep_status = volume['replication_status']
if rep_status is not None and rep_status != 'disabled':
msg = _("Volume %s must not be replicated.") % volume['id']
if (volume.replication_status is not None and
volume.replication_status != 'disabled'):
msg = _("Volume %s must not be replicated.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
cg_id = volume.get('consistencygroup_id', None)
if cg_id:
if volume.consistencygroup_id:
msg = _("Volume %s must not be part of a consistency "
"group.") % volume['id']
"group.") % volume.id
LOG.error(msg)
raise exception.InvalidVolume(reason=msg)
@@ -1327,7 +1326,7 @@ class API(base.Base):
raise exception.InvalidHost(reason=msg)
# Make sure the destination host is different than the current one
if host == volume['host']:
if host == volume.host:
msg = _('Destination host must be different '
'than the current host.')
LOG.error(msg)
@@ -1340,27 +1339,27 @@ class API(base.Base):
# that this volume is in maintenance mode, and no action is allowed
# on this volume, e.g. attach, detach, retype, migrate, etc.
updates = {'migration_status': 'starting',
'previous_status': volume['status']}
if lock_volume and volume['status'] == 'available':
'previous_status': volume.status}
if lock_volume and volume.status == 'available':
updates['status'] = 'maintenance'
self.update(context, volume, updates)
# Call the scheduler to ensure that the host exists and that it can
# accept the volume
volume_type = {}
volume_type_id = volume['volume_type_id']
if volume_type_id:
if volume.volume_type_id:
volume_type = volume_types.get_volume_type(context.elevated(),
volume_type_id)
volume.volume_type_id)
request_spec = {'volume_properties': volume,
'volume_type': volume_type,
'volume_id': volume['id']}
'volume_id': volume.id}
self.scheduler_rpcapi.migrate_volume_to_host(context,
CONF.volume_topic,
volume['id'],
volume.id,
host,
force_host_copy,
request_spec)
request_spec,
volume=volume)
LOG.info(_LI("Migrate volume request issued successfully."),
resource=volume)
@@ -1368,34 +1367,34 @@ class API(base.Base):
def migrate_volume_completion(self, context, volume, new_volume, error):
# This is a volume swap initiated by Nova, not Cinder. Nova expects
# us to return the new_volume_id.
if not (volume['migration_status'] or new_volume['migration_status']):
if not (volume.migration_status or new_volume.migration_status):
# Don't need to do migration, but still need to finish the
# volume attach and detach so volumes don't end in 'attaching'
# and 'detaching' state
attachments = volume['volume_attachment']
attachments = volume.volume_attachment
for attachment in attachments:
self.detach(context, volume, attachment['id'])
self.detach(context, volume, attachment.id)
self.attach(context, new_volume,
attachment['instance_uuid'],
attachment['attached_host'],
attachment['mountpoint'],
attachment.instance_uuid,
attachment.attached_host,
attachment.mountpoint,
'rw')
return new_volume['id']
return new_volume.id
if not volume['migration_status']:
if not volume.migration_status:
msg = _('Source volume not mid-migration.')
raise exception.InvalidVolume(reason=msg)
if not new_volume['migration_status']:
if not new_volume.migration_status:
msg = _('Destination volume not mid-migration.')
raise exception.InvalidVolume(reason=msg)
expected_status = 'target:%s' % volume['id']
if not new_volume['migration_status'] == expected_status:
expected_status = 'target:%s' % volume.id
if not new_volume.migration_status == expected_status:
msg = (_('Destination has migration_status %(stat)s, expected '
'%(exp)s.') % {'stat': new_volume['migration_status'],
'%(exp)s.') % {'stat': new_volume.migration_status,
'exp': expected_status})
raise exception.InvalidVolume(reason=msg)

View File

@@ -197,7 +197,7 @@ def locked_snapshot_operation(f):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
RPC_API_VERSION = '1.35'
RPC_API_VERSION = '1.36'
target = messaging.Target(version=RPC_API_VERSION)
@@ -1626,35 +1626,38 @@ class VolumeManager(manager.SchedulerDependentManager):
# Wait for new_volume to become ready
starttime = time.time()
deadline = starttime + CONF.migration_create_volume_timeout_secs
new_volume = self.db.volume_get(ctxt, new_volume['id'])
# TODO(thangp): Replace get_by_id with refresh when it is available
new_volume = objects.Volume.get_by_id(ctxt, new_volume.id)
tries = 0
while new_volume['status'] != 'available':
while new_volume.status != 'available':
tries += 1
now = time.time()
if new_volume['status'] == 'error':
if new_volume.status == 'error':
msg = _("failed to create new_volume on destination host")
self._clean_temporary_volume(ctxt, volume['id'],
new_volume['id'],
self._clean_temporary_volume(ctxt, volume,
new_volume,
clean_db_only=True)
raise exception.VolumeMigrationFailed(reason=msg)
elif now > deadline:
msg = _("timeout creating new_volume on destination host")
self._clean_temporary_volume(ctxt, volume['id'],
new_volume['id'],
self._clean_temporary_volume(ctxt, volume,
new_volume,
clean_db_only=True)
raise exception.VolumeMigrationFailed(reason=msg)
else:
time.sleep(tries ** 2)
new_volume = self.db.volume_get(ctxt, new_volume['id'])
# TODO(thangp): Replace get_by_id with refresh when it is
# available
new_volume = objects.Volume.get_by_id(ctxt, new_volume.id)
# Copy the source volume to the destination volume
try:
attachments = volume['volume_attachment']
attachments = volume.volume_attachment
if not attachments:
self._copy_volume_data(ctxt, volume, new_volume, remote='dest')
# The above call is synchronous so we complete the migration
self.migrate_volume_completion(ctxt, volume['id'],
new_volume['id'],
self.migrate_volume_completion(ctxt, volume.id,
new_volume.id,
error=False)
else:
nova_api = compute.API()
@@ -1663,58 +1666,63 @@ class VolumeManager(manager.SchedulerDependentManager):
for attachment in attachments:
instance_uuid = attachment['instance_uuid']
nova_api.update_server_volume(ctxt, instance_uuid,
volume['id'],
new_volume['id'])
volume.id,
new_volume.id)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Failed to copy volume %(vol1)s to %(vol2)s"),
{'vol1': volume['id'], 'vol2': new_volume['id']})
self._clean_temporary_volume(ctxt, volume['id'],
new_volume['id'])
{'vol1': volume.id, 'vol2': new_volume.id})
self._clean_temporary_volume(ctxt, volume,
new_volume)
def _clean_temporary_volume(self, ctxt, volume_id, new_volume_id,
def _clean_temporary_volume(self, ctxt, volume, new_volume,
clean_db_only=False):
volume = self.db.volume_get(ctxt, volume_id)
# If we're in the migrating phase, we need to cleanup
# destination volume because source volume is remaining
if volume['migration_status'] == 'migrating':
if volume.migration_status == 'migrating':
try:
if clean_db_only:
# The temporary volume is not created, only DB data
# is created
self.db.volume_destroy(ctxt, new_volume_id)
new_volume.destroy()
else:
# The temporary volume is already created
rpcapi = volume_rpcapi.VolumeAPI()
volume = self.db.volume_get(ctxt, new_volume_id)
rpcapi.delete_volume(ctxt, volume)
rpcapi.delete_volume(ctxt, new_volume)
except exception.VolumeNotFound:
LOG.info(_LI("Couldn't find the temporary volume "
"%(vol)s in the database. There is no need "
"to clean up this volume."),
{'vol': new_volume_id})
{'vol': new_volume.id})
else:
# If we're in the completing phase don't delete the
# destination because we may have already deleted the
# source! But the migration_status in database should
# be cleared to handle volume after migration failure
try:
updates = {'migration_status': None}
self.db.volume_update(ctxt, new_volume_id, updates)
new_volume.migration_status = None
new_volume.save()
except exception.VolumeNotFound:
LOG.info(_LI("Couldn't find destination volume "
"%(vol)s in the database. The entry might be "
"successfully deleted during migration "
"completion phase."),
{'vol': new_volume_id})
{'vol': new_volume.id})
LOG.warning(_LW("Failed to migrate volume. The destination "
"volume %(vol)s is not deleted since the "
"source volume may have been deleted."),
{'vol': new_volume_id})
{'vol': new_volume.id})
def migrate_volume_completion(self, ctxt, volume_id, new_volume_id,
error=False):
error=False, volume=None, new_volume=None):
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None or new_volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(ctxt, volume_id)
new_volume = objects.Volume.get_by_id(ctxt, new_volume_id)
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
@@ -1722,37 +1730,36 @@ class VolumeManager(manager.SchedulerDependentManager):
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
self.db.volume_update(ctxt, volume_id,
{'migration_status': 'error'})
volume.migration_status = 'error'
volume.save()
LOG.debug("migrate_volume_completion: completing migration for "
"volume %(vol1)s (temporary volume %(vol2)s",
{'vol1': volume_id, 'vol2': new_volume_id})
volume = self.db.volume_get(ctxt, volume_id)
new_volume = self.db.volume_get(ctxt, new_volume_id)
{'vol1': volume.id, 'vol2': new_volume.id})
rpcapi = volume_rpcapi.VolumeAPI()
orig_volume_status = volume['previous_status']
orig_volume_status = volume.previous_status
if error:
LOG.info(_LI("migrate_volume_completion is cleaning up an error "
"for volume %(vol1)s (temporary volume %(vol2)s"),
{'vol1': volume['id'], 'vol2': new_volume['id']})
{'vol1': volume['id'], 'vol2': new_volume.id})
rpcapi.delete_volume(ctxt, new_volume)
updates = {'migration_status': 'error',
'status': orig_volume_status}
self.db.volume_update(ctxt, volume_id, updates)
return volume_id
volume.update(updates)
volume.save()
return volume.id
self.db.volume_update(ctxt, volume_id,
{'migration_status': 'completing'})
volume.migration_status = 'completing'
volume.save()
# Detach the source volume (if it fails, don't fail the migration)
try:
if orig_volume_status == 'in-use':
attachments = volume['volume_attachment']
attachments = volume.volume_attachment
for attachment in attachments:
self.detach_volume(ctxt, volume_id, attachment['id'])
self.detach_volume(ctxt, volume.id, attachment['id'])
except Exception as ex:
LOG.error(_LE("Detach migration source volume failed: %(err)s"),
{'err': ex}, resource=volume)
@@ -1767,20 +1774,21 @@ class VolumeManager(manager.SchedulerDependentManager):
# Swap src and dest DB records so we can continue using the src id and
# asynchronously delete the destination id
__, updated_new = self.db.finish_volume_migration(
ctxt, volume_id, new_volume_id)
ctxt, volume.id, new_volume.id)
updates = {'status': orig_volume_status,
'previous_status': volume['status'],
'previous_status': volume.status,
'migration_status': 'success'}
if orig_volume_status == 'in-use':
attachments = volume['volume_attachment']
attachments = volume.volume_attachment
for attachment in attachments:
rpcapi.attach_volume(ctxt, volume,
attachment['instance_uuid'],
attachment['attached_host'],
attachment['mountpoint'],
'rw')
self.db.volume_update(ctxt, volume_id, updates)
volume.update(updates)
volume.save()
# Asynchronous deletion of the source volume in the back-end (now
# pointed by the target volume id)
@@ -1789,15 +1797,21 @@ class VolumeManager(manager.SchedulerDependentManager):
except Exception as ex:
LOG.error(_LE('Failed to request async delete of migration source '
'vol %(vol)s: %(err)s'),
{'vol': volume_id, 'err': ex})
{'vol': volume.id, 'err': ex})
LOG.info(_LI("Complete-Migrate volume completed successfully."),
resource=volume)
return volume['id']
return volume.id
def migrate_volume(self, ctxt, volume_id, host, force_host_copy=False,
new_type_id=None):
new_type_id=None, volume=None):
"""Migrate the volume to the specified host (called on source host)."""
# FIXME(thangp): Remove this in v2.0 of RPC API.
if volume is None:
# For older clients, mimic the old behavior and look up the volume
# by its volume_id.
volume = objects.Volume.get_by_id(context, volume_id)
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
@@ -1805,54 +1819,54 @@ class VolumeManager(manager.SchedulerDependentManager):
utils.require_driver_initialized(self.driver)
except exception.DriverNotInitialized:
with excutils.save_and_reraise_exception():
self.db.volume_update(ctxt, volume_id,
{'migration_status': 'error'})
volume.migration_status = 'error'
volume.save()
volume_ref = self.db.volume_get(ctxt, volume_id)
model_update = None
moved = False
status_update = None
if volume_ref['status'] in ('retyping', 'maintenance'):
status_update = {'status': volume_ref['previous_status']}
if volume.status in ('retyping', 'maintenance'):
status_update = {'status': volume.previous_status}
self.db.volume_update(ctxt, volume_ref['id'],
{'migration_status': 'migrating'})
volume.migration_status = 'migrating'
volume.save()
if not force_host_copy and new_type_id is None:
try:
LOG.debug("Issue driver.migrate_volume.", resource=volume_ref)
LOG.debug("Issue driver.migrate_volume.", resource=volume)
moved, model_update = self.driver.migrate_volume(ctxt,
volume_ref,
volume,
host)
if moved:
updates = {'host': host['host'],
'migration_status': 'success',
'previous_status': volume_ref['status']}
'previous_status': volume.status}
if status_update:
updates.update(status_update)
if model_update:
updates.update(model_update)
volume_ref = self.db.volume_update(ctxt,
volume_ref['id'],
updates)
volume.update(updates)
volume.save()
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': 'error'}
if status_update:
updates.update(status_update)
self.db.volume_update(ctxt, volume_ref['id'], updates)
volume.update(updates)
volume.save()
if not moved:
try:
self._migrate_volume_generic(ctxt, volume_ref, host,
self._migrate_volume_generic(ctxt, volume, host,
new_type_id)
except Exception:
with excutils.save_and_reraise_exception():
updates = {'migration_status': 'error'}
if status_update:
updates.update(status_update)
self.db.volume_update(ctxt, volume_ref['id'], updates)
volume.update(updates)
volume.save()
LOG.info(_LI("Migrate volume completed successfully."),
resource=volume_ref)
resource=volume)
@periodic_task.periodic_task
def _report_driver_status(self, context):
@@ -3088,14 +3102,16 @@ class VolumeManager(manager.SchedulerDependentManager):
def update_migrated_volume(self, ctxt, volume, new_volume,
volume_status):
"""Finalize migration process on backend device."""
# FIXME(thangp): Remove this in v2.0 of RPC API.
if (not isinstance(volume, objects.Volume) or
not isinstance(new_volume, objects.Volume)):
volume = objects.Volume.get_by_id(ctxt, volume['id'])
new_volume = objects.Volume.get_by_id(ctxt, new_volume['id'])
model_update = None
# This is temporary fix for bug 1491210.
volume = self.db.volume_get(ctxt, volume['id'])
new_volume = self.db.volume_get(ctxt, new_volume['id'])
model_update_default = {'_name_id': new_volume['_name_id'] or
new_volume['id'],
model_update_default = {'_name_id': new_volume.name_id,
'provider_location':
new_volume['provider_location']}
new_volume.provider_location}
try:
model_update = self.driver.update_migrated_volume(ctxt,
volume,
@@ -3119,17 +3135,19 @@ class VolumeManager(manager.SchedulerDependentManager):
if volume.get('volume_metadata'):
model_update_new[key] = {
metadata['key']: metadata['value']
for metadata in volume.get('volume_metadata')}
for metadata in volume.volume_metadata}
elif key == 'admin_metadata':
model_update_new[key] = {
metadata['key']: metadata['value']
for metadata in volume.get('volume_admin_metadata')}
for metadata in volume.volume_admin_metadata}
else:
model_update_new[key] = volume[key]
self.db.volume_update(ctxt.elevated(), new_volume['id'],
model_update_new)
self.db.volume_update(ctxt.elevated(), volume['id'],
model_update_default)
with new_volume.obj_as_admin():
new_volume.update(model_update_new)
new_volume.save()
with volume.obj_as_admin():
volume.update(model_update_default)
volume.save()
# Replication V2 methods
def enable_replication(self, context, volume):

View File

@@ -83,6 +83,8 @@ class VolumeAPI(object):
1.33 - Adds support for sending objects over RPC in delete_volume().
1.34 - Adds support for sending objects over RPC in retype().
1.35 - Adds support for sending objects over RPC in extend_volume().
1.36 - Adds support for sending objects over RPC in migrate_volume(),
migrate_volume_completion(), and update_migrated_volume().
"""
BASE_RPC_API_VERSION = '1.0'
@@ -246,20 +248,35 @@ class VolumeAPI(object):
cctxt.cast(ctxt, 'extend_volume', **msg_args)
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.8')
new_host = utils.extract_host(volume.host)
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
cctxt.cast(ctxt, 'migrate_volume', volume_id=volume['id'],
host=host_p, force_host_copy=force_host_copy)
msg_args = {'volume_id': volume.id, 'host': host_p,
'force_host_copy': force_host_copy}
if self.client.can_send_version('1.36'):
version = '1.36'
msg_args['volume'] = volume
else:
version = '1.8'
cctxt = self.client.prepare(server=new_host, version=version)
cctxt.cast(ctxt, 'migrate_volume', **msg_args)
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
new_host = utils.extract_host(volume['host'])
cctxt = self.client.prepare(server=new_host, version='1.10')
return cctxt.call(ctxt, 'migrate_volume_completion',
volume_id=volume['id'],
new_volume_id=new_volume['id'],
error=error)
new_host = utils.extract_host(volume.host)
msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
'error': error}
if self.client.can_send_version('1.36'):
version = '1.36'
msg_args['volume'] = volume
msg_args['new_volume'] = new_volume
else:
version = '1.10'
cctxt = self.client.prepare(server=new_host, version=version)
return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None):
@@ -296,7 +313,7 @@ class VolumeAPI(object):
def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status):
host = utils.extract_host(new_volume['host'])
cctxt = self.client.prepare(server=host, version='1.19')
cctxt = self.client.prepare(server=host, version='1.36')
cctxt.call(ctxt,
'update_migrated_volume',
volume=volume,