Merge "Clear migration_status from a destination volume if migration fails"
This commit is contained in:
commit
2abbe19ee3
|
@ -29,6 +29,7 @@ import tempfile
|
|||
import eventlet
|
||||
import mock
|
||||
import mox
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
|
@ -3116,6 +3117,33 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
self.assertEqual(volume['host'], 'newhost')
|
||||
self.assertIsNone(volume['migration_status'])
|
||||
|
||||
def test_migrate_volume_error(self):
|
||||
def fake_create_volume(ctxt, volume, host, req_spec, filters,
|
||||
allow_reschedule=True):
|
||||
db.volume_update(ctxt, volume['id'],
|
||||
{'status': 'available'})
|
||||
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume') as \
|
||||
mock_migrate,\
|
||||
mock.patch.object(self.volume.driver, 'create_export') as \
|
||||
mock_create_export:
|
||||
|
||||
# Exception case at self.driver.migrate_volume and create_export
|
||||
mock_migrate.side_effect = processutils.ProcessExecutionError
|
||||
mock_create_export.side_effect = processutils.ProcessExecutionError
|
||||
volume = tests_utils.create_volume(self.context, size=0,
|
||||
host=CONF.host)
|
||||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
self.volume.migrate_volume,
|
||||
self.context,
|
||||
volume['id'],
|
||||
host_obj,
|
||||
False)
|
||||
volume = db.volume_get(context.get_admin_context(), volume['id'])
|
||||
self.assertIsNone(volume['migration_status'])
|
||||
self.assertEqual('available', volume['status'])
|
||||
|
||||
def test_migrate_volume_generic(self):
|
||||
def fake_migr(vol, host):
|
||||
raise Exception('should not be called')
|
||||
|
@ -3156,6 +3184,109 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
self.assertEqual(volume['status'], 'available')
|
||||
self.assertEqual(error_logs, [])
|
||||
|
||||
def test_migrate_volume_generic_copy_error(self):
|
||||
def fake_create_volume(ctxt, volume, host, req_spec, filters,
|
||||
allow_reschedule=True):
|
||||
db.volume_update(ctxt, volume['id'],
|
||||
{'status': 'available'})
|
||||
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
|
||||
as mock_create_volume,\
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data') as \
|
||||
mock_copy_volume,\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
|
||||
mock.patch.object(self.volume, 'migrate_volume_completion'),\
|
||||
mock.patch.object(self.volume.driver, 'create_export'):
|
||||
|
||||
# Exception case at migrate_volume_generic
|
||||
# source_volume['migration_status'] is 'migrating'
|
||||
mock_create_volume.side_effect = fake_create_volume
|
||||
mock_copy_volume.side_effect = processutils.ProcessExecutionError
|
||||
volume = tests_utils.create_volume(self.context, size=0,
|
||||
host=CONF.host)
|
||||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
self.volume.migrate_volume,
|
||||
self.context,
|
||||
volume['id'],
|
||||
host_obj,
|
||||
True)
|
||||
volume = db.volume_get(context.get_admin_context(), volume['id'])
|
||||
self.assertIsNone(volume['migration_status'])
|
||||
self.assertEqual('available', volume['status'])
|
||||
|
||||
def test_migrate_volume_generic_create_export_error(self):
|
||||
def fake_create_volume(ctxt, volume, host, req_spec, filters,
|
||||
allow_reschedule=True):
|
||||
db.volume_update(ctxt, volume['id'],
|
||||
{'status': 'available'})
|
||||
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
|
||||
as mock_create_volume,\
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data') as \
|
||||
mock_copy_volume,\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
|
||||
mock.patch.object(self.volume, 'migrate_volume_completion'),\
|
||||
mock.patch.object(self.volume.driver, 'create_export') as \
|
||||
mock_create_export:
|
||||
|
||||
# Exception case at create_export
|
||||
mock_create_volume.side_effect = fake_create_volume
|
||||
mock_copy_volume.side_effect = processutils.ProcessExecutionError
|
||||
mock_create_export.side_effect = processutils.ProcessExecutionError
|
||||
volume = tests_utils.create_volume(self.context, size=0,
|
||||
host=CONF.host)
|
||||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
self.volume.migrate_volume,
|
||||
self.context,
|
||||
volume['id'],
|
||||
host_obj,
|
||||
True)
|
||||
volume = db.volume_get(context.get_admin_context(), volume['id'])
|
||||
self.assertIsNone(volume['migration_status'])
|
||||
self.assertEqual('available', volume['status'])
|
||||
|
||||
def test_migrate_volume_generic_migrate_volume_completion_error(self):
|
||||
def fake_create_volume(ctxt, volume, host, req_spec, filters,
|
||||
allow_reschedule=True):
|
||||
db.volume_update(ctxt, volume['id'],
|
||||
{'status': 'available'})
|
||||
|
||||
def fake_migrate_volume_completion(ctxt, volume_id, new_volume_id,
|
||||
error=False):
|
||||
db.volume_update(ctxt, volume['id'],
|
||||
{'migration_status': 'completing'})
|
||||
raise processutils.ProcessExecutionError
|
||||
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
|
||||
as mock_create_volume,\
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
|
||||
mock.patch.object(self.volume, 'migrate_volume_completion')\
|
||||
as mock_migrate_compl,\
|
||||
mock.patch.object(self.volume.driver, 'create_export'):
|
||||
|
||||
# Exception case at delete_volume
|
||||
# source_volume['migration_status'] is 'completing'
|
||||
mock_create_volume.side_effect = fake_create_volume
|
||||
mock_migrate_compl.side_effect = fake_migrate_volume_completion
|
||||
volume = tests_utils.create_volume(self.context, size=0,
|
||||
host=CONF.host)
|
||||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
self.assertRaises(processutils.ProcessExecutionError,
|
||||
self.volume.migrate_volume,
|
||||
self.context,
|
||||
volume['id'],
|
||||
host_obj,
|
||||
True)
|
||||
volume = db.volume_get(context.get_admin_context(), volume['id'])
|
||||
self.assertIsNone(volume['migration_status'])
|
||||
self.assertEqual('available', volume['status'])
|
||||
|
||||
def _test_migrate_volume_completion(self, status='available',
|
||||
instance_uuid=None, attached_host=None,
|
||||
retyping=False):
|
||||
|
|
|
@ -442,7 +442,22 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
|
||||
@locked_volume_operation
|
||||
def delete_volume(self, context, volume_id, unmanage_only=False):
|
||||
"""Deletes and unexports volume."""
|
||||
"""Deletes and unexports volume.
|
||||
|
||||
1. Delete a volume(normal case)
|
||||
Delete a volume and update quotas.
|
||||
|
||||
2. Delete a migration source volume
|
||||
If deleting the source volume in a migration, we want to skip
|
||||
quotas. Also we want to skip other database updates for source
|
||||
volume because these update will be handled at
|
||||
migrate_volume_completion properly.
|
||||
|
||||
3. Delete a migration destination volume
|
||||
If deleting the destination volume in a migration, we want to
|
||||
skip quotas but we need database updates for the volume.
|
||||
"""
|
||||
|
||||
context = context.elevated()
|
||||
|
||||
try:
|
||||
|
@ -493,50 +508,62 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
volume_ref['id'],
|
||||
{'status': 'error_deleting'})
|
||||
|
||||
# If deleting the source volume in a migration, we want to skip quotas
|
||||
# and other database updates.
|
||||
if volume_ref['migration_status']:
|
||||
return True
|
||||
is_migrating = volume_ref['migration_status'] is not None
|
||||
is_migrating_dest = (is_migrating and
|
||||
volume_ref['migration_status'].startswith(
|
||||
'target:'))
|
||||
|
||||
# Get reservations
|
||||
try:
|
||||
reserve_opts = {'volumes': -1, 'gigabytes': -volume_ref['size']}
|
||||
QUOTAS.add_volume_type_opts(context,
|
||||
reserve_opts,
|
||||
volume_ref.get('volume_type_id'))
|
||||
reservations = QUOTAS.reserve(context,
|
||||
project_id=project_id,
|
||||
**reserve_opts)
|
||||
except Exception:
|
||||
reservations = None
|
||||
LOG.exception(_LE("Failed to update usages deleting volume"))
|
||||
# If deleting source/destination volume in a migration, we should
|
||||
# skip quotas.
|
||||
if not is_migrating:
|
||||
# Get reservations
|
||||
try:
|
||||
reserve_opts = {'volumes': -1,
|
||||
'gigabytes': -volume_ref['size']}
|
||||
QUOTAS.add_volume_type_opts(context,
|
||||
reserve_opts,
|
||||
volume_ref.get('volume_type_id'))
|
||||
reservations = QUOTAS.reserve(context,
|
||||
project_id=project_id,
|
||||
**reserve_opts)
|
||||
except Exception:
|
||||
reservations = None
|
||||
LOG.exception(_LE("Failed to update usages deleting volume"))
|
||||
|
||||
# Delete glance metadata if it exists
|
||||
self.db.volume_glance_metadata_delete_by_volume(context, volume_id)
|
||||
# If deleting the source volume in a migration, we should skip database
|
||||
# update here. In other cases, continue to update database entries.
|
||||
if not is_migrating or is_migrating_dest:
|
||||
|
||||
self.db.volume_destroy(context, volume_id)
|
||||
LOG.info(_LI("volume %s: deleted successfully"), volume_ref['id'])
|
||||
self._notify_about_volume_usage(context, volume_ref, "delete.end")
|
||||
# Delete glance metadata if it exists
|
||||
self.db.volume_glance_metadata_delete_by_volume(context, volume_id)
|
||||
|
||||
# Commit the reservations
|
||||
if reservations:
|
||||
QUOTAS.commit(context, reservations, project_id=project_id)
|
||||
self.db.volume_destroy(context, volume_id)
|
||||
LOG.info(_LI("volume %s: deleted successfully"), volume_ref['id'])
|
||||
|
||||
pool = vol_utils.extract_host(volume_ref['host'], 'pool')
|
||||
if pool is None:
|
||||
# Legacy volume, put them into default pool
|
||||
pool = self.driver.configuration.safe_get(
|
||||
'volume_backend_name') or vol_utils.extract_host(
|
||||
volume_ref['host'], 'pool', True)
|
||||
size = volume_ref['size']
|
||||
# If deleting source/destination volume in a migration, we should
|
||||
# skip quotas.
|
||||
if not is_migrating:
|
||||
self._notify_about_volume_usage(context, volume_ref, "delete.end")
|
||||
|
||||
try:
|
||||
self.stats['pools'][pool]['allocated_capacity_gb'] -= size
|
||||
except KeyError:
|
||||
self.stats['pools'][pool] = dict(
|
||||
allocated_capacity_gb=-size)
|
||||
# Commit the reservations
|
||||
if reservations:
|
||||
QUOTAS.commit(context, reservations, project_id=project_id)
|
||||
|
||||
self.publish_service_capabilities(context)
|
||||
pool = vol_utils.extract_host(volume_ref['host'], 'pool')
|
||||
if pool is None:
|
||||
# Legacy volume, put them into default pool
|
||||
pool = self.driver.configuration.safe_get(
|
||||
'volume_backend_name') or vol_utils.extract_host(
|
||||
volume_ref['host'], 'pool', True)
|
||||
size = volume_ref['size']
|
||||
|
||||
try:
|
||||
self.stats['pools'][pool]['allocated_capacity_gb'] -= size
|
||||
except KeyError:
|
||||
self.stats['pools'][pool] = dict(
|
||||
allocated_capacity_gb=-size)
|
||||
|
||||
self.publish_service_capabilities(context)
|
||||
|
||||
return True
|
||||
|
||||
|
@ -1091,11 +1118,30 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
LOG.error(msg % {'vol1': volume['id'],
|
||||
'vol2': new_volume['id']})
|
||||
volume = self.db.volume_get(ctxt, volume['id'])
|
||||
# If we're in the completing phase don't delete the target
|
||||
# because we may have already deleted the source!
|
||||
|
||||
# If we're in the migrating phase, we need to cleanup
|
||||
# destination volume because source volume is remaining
|
||||
if volume['migration_status'] == 'migrating':
|
||||
rpcapi.delete_volume(ctxt, new_volume)
|
||||
new_volume['migration_status'] = None
|
||||
else:
|
||||
# If we're in the completing phase don't delete the
|
||||
# destination because we may have already deleted the
|
||||
# source! But the migration_status in database should
|
||||
# be cleared to handle volume after migration failure
|
||||
try:
|
||||
updates = {'migration_status': None}
|
||||
self.db.volume_update(ctxt, new_volume['id'], updates)
|
||||
except exception.VolumeNotFound:
|
||||
LOG.info(_LI("Couldn't find destination volume "
|
||||
"%(vol)s in database. The entry might be "
|
||||
"successfully deleted during migration "
|
||||
"completion phase."),
|
||||
{'vol': new_volume['id']})
|
||||
|
||||
LOG.warn(_LW("Failed to migrate volume. The destination "
|
||||
"volume %(vol)s is not deleted since the "
|
||||
"source volume may have already deleted."),
|
||||
{'vol': new_volume['id']})
|
||||
|
||||
def _get_original_status(self, volume):
|
||||
if (volume['instance_uuid'] is None and
|
||||
|
@ -1130,7 +1176,6 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
"for volume %(vol1)s (temporary volume %(vol2)s")
|
||||
LOG.info(msg % {'vol1': volume['id'],
|
||||
'vol2': new_volume['id']})
|
||||
new_volume['migration_status'] = None
|
||||
rpcapi.delete_volume(ctxt, new_volume)
|
||||
updates = {'migration_status': None, 'status': orig_volume_status}
|
||||
self.db.volume_update(ctxt, volume_id, updates)
|
||||
|
@ -1217,10 +1262,16 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
updates = {'migration_status': None}
|
||||
if status_update:
|
||||
updates.update(status_update)
|
||||
model_update = self.driver.create_export(ctxt, volume_ref)
|
||||
if model_update:
|
||||
updates.update(model_update)
|
||||
self.db.volume_update(ctxt, volume_ref['id'], updates)
|
||||
try:
|
||||
model_update = self.driver.create_export(ctxt,
|
||||
volume_ref)
|
||||
if model_update:
|
||||
updates.update(model_update)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed to create export for "
|
||||
"volume: %s"), volume_ref['id'])
|
||||
finally:
|
||||
self.db.volume_update(ctxt, volume_ref['id'], updates)
|
||||
if not moved:
|
||||
try:
|
||||
self._migrate_volume_generic(ctxt, volume_ref, host,
|
||||
|
@ -1230,10 +1281,16 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
updates = {'migration_status': None}
|
||||
if status_update:
|
||||
updates.update(status_update)
|
||||
model_update = self.driver.create_export(ctxt, volume_ref)
|
||||
if model_update:
|
||||
updates.update(model_update)
|
||||
self.db.volume_update(ctxt, volume_ref['id'], updates)
|
||||
try:
|
||||
model_update = self.driver.create_export(ctxt,
|
||||
volume_ref)
|
||||
if model_update:
|
||||
updates.update(model_update)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed to create export for "
|
||||
"volume: %s"), volume_ref['id'])
|
||||
finally:
|
||||
self.db.volume_update(ctxt, volume_ref['id'], updates)
|
||||
|
||||
@periodic_task.periodic_task
|
||||
def _report_driver_status(self, context):
|
||||
|
|
Loading…
Reference in New Issue