Make live migration checks async

This change makes the checks that happen before a live migration
non-blocking so that the user can receive an HTTP accepted and
later check the results of the live-migration via instance-actions.

bp: async-live-migration-rest-check

Change-Id: I4f788d48ec0e166898184842ac55560163c1b343
This commit is contained in:
Timofey Durakov 2016-04-29 16:39:36 +03:00
parent baaeec1b63
commit 15f5aa013e
7 changed files with 102 additions and 42 deletions

View File

@ -3225,7 +3225,7 @@ class API(base.Base):
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED])
def live_migrate(self, context, instance, block_migration,
disk_over_commit, host_name, force=None):
disk_over_commit, host_name, force=None, async=False):
"""Migrate a server lively to a new host."""
LOG.debug("Going to try to live migrate instance to %s",
host_name or "another host", instance=instance)
@ -3270,7 +3270,7 @@ class API(base.Base):
self.compute_task_api.live_migrate_instance(context, instance,
host_name, block_migration=block_migration,
disk_over_commit=disk_over_commit,
request_spec=request_spec)
request_spec=request_spec, async=async)
except oslo_exceptions.MessagingTimeout as messaging_timeout:
with excutils.save_and_reraise_exception():
# NOTE(pkoniszewski): It is possible that MessagingTimeout

View File

@ -79,9 +79,9 @@ class LocalComputeTaskAPI(object):
block_migration, disk_over_commit,
request_spec=None):
scheduler_hint = {'host': host_name}
self._manager.migrate_server(
context, instance, scheduler_hint, True, False, None,
block_migration, disk_over_commit, None, request_spec=request_spec)
self._manager.live_migrate_instance(context, instance, scheduler_hint,
block_migration, disk_over_commit,
request_spec)
def build_instances(self, context, instances, image,
filter_properties, admin_password, injected_files,
@ -189,11 +189,17 @@ class ComputeTaskAPI(object):
def live_migrate_instance(self, context, instance, host_name,
block_migration, disk_over_commit,
request_spec=None):
request_spec=None, async=False):
scheduler_hint = {'host': host_name}
self.conductor_compute_rpcapi.migrate_server(
context, instance, scheduler_hint, True, False, None,
block_migration, disk_over_commit, None, request_spec=request_spec)
if async:
self.conductor_compute_rpcapi.live_migrate_instance(
context, instance, scheduler_hint, block_migration,
disk_over_commit, request_spec)
else:
self.conductor_compute_rpcapi.migrate_server(
context, instance, scheduler_hint, True, False, None,
block_migration, disk_over_commit, None,
request_spec=request_spec)
def build_instances(self, context, instances, image, filter_properties,
admin_password, injected_files, requested_networks,

View File

@ -145,7 +145,7 @@ class ComputeTaskManager(base.Base):
may involve coordinating activities on multiple compute nodes.
"""
target = messaging.Target(namespace='compute_task', version='1.14')
target = messaging.Target(namespace='compute_task', version='1.15')
def __init__(self):
super(ComputeTaskManager, self).__init__()
@ -161,6 +161,8 @@ class ComputeTaskManager(base.Base):
compute_rpcapi.LAST_VERSION = None
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
# TODO(tdurakov): remove `live` parameter here on compute task api RPC
# version bump to 2.x
@messaging.expected_exceptions(
exception.NoValidHost,
exception.ComputeServiceUnavailable,
@ -288,6 +290,12 @@ class ComputeTaskManager(base.Base):
# exception will be raised by instance.save()
pass
@wrap_instance_event(prefix='conductor')
def live_migrate_instance(self, context, instance, scheduler_hint,
block_migration, disk_over_commit, request_spec):
self._live_migrate(context, instance, scheduler_hint,
block_migration, disk_over_commit, request_spec)
def _live_migrate(self, context, instance, scheduler_hint,
block_migration, disk_over_commit, request_spec):
destination = scheduler_hint.get("host")

View File

@ -266,6 +266,7 @@ class ComputeTaskAPI(object):
1.12 - Added request_spec to rebuild_instance()
1.13 - Added request_spec to migrate_server()
1.14 - Added request_spec to unshelve_instance()
1.15 - Added live_migrate_instance
"""
def __init__(self):
@ -276,6 +277,17 @@ class ComputeTaskAPI(object):
serializer = objects_base.NovaObjectSerializer()
self.client = rpc.get_client(target, serializer=serializer)
def live_migrate_instance(self, context, instance, scheduler_hint,
block_migration, disk_over_commit, request_spec):
kw = {'instance': instance, 'scheduler_hint': scheduler_hint,
'block_migration': block_migration,
'disk_over_commit': disk_over_commit,
'request_spec': request_spec,
}
version = '1.15'
cctxt = self.client.prepare(version=version)
cctxt.cast(context, 'live_migrate_instance', **kw)
def migrate_server(self, context, instance, scheduler_hint, live, rebuild,
flavor, block_migration, disk_over_commit,
reservations=None, clean_shutdown=True, request_spec=None):

View File

@ -9940,7 +9940,7 @@ class ComputeAPITestCase(BaseTestCase):
block_migration=True,
disk_over_commit=True,
host_name='fake_dest_host',
force=force)
force=force, async=False)
record_action_start.assert_called_once_with(self.context, instance,
'live-migration')
@ -9952,7 +9952,7 @@ class ComputeAPITestCase(BaseTestCase):
self.context, instance, host,
block_migration=True,
disk_over_commit=True,
request_spec=fake_spec)
request_spec=fake_spec, async=False)
do_test()
instance.refresh()

View File

@ -1970,7 +1970,8 @@ class _ComputeAPIUnitTestMixIn(object):
'fake_dest_host',
block_migration=True,
disk_over_commit=True,
request_spec=fake_spec)
request_spec=fake_spec,
async=False)
def test_swap_volume_volume_api_usage(self):
# This test ensures that volume_id arguments are passed to volume_api

View File

@ -326,35 +326,6 @@ class _BaseTaskTestCase(object):
return rebuild_args, compute_rebuild_args
@mock.patch('nova.objects.Migration')
def test_live_migrate(self, migobj):
inst = fake_instance.fake_db_instance()
inst_obj = objects.Instance._from_db_object(
self.context, objects.Instance(), inst, [])
migration = migobj()
self.mox.StubOutWithMock(live_migrate.LiveMigrationTask, 'execute')
task = self.conductor_manager._build_live_migrate_task(
self.context, inst_obj, 'destination', 'block_migration',
'disk_over_commit', migration)
task.execute()
self.mox.ReplayAll()
if isinstance(self.conductor, (conductor_api.ComputeTaskAPI,
conductor_api.LocalComputeTaskAPI)):
# The API method is actually 'live_migrate_instance'. It gets
# converted into 'migrate_server' when doing RPC.
self.conductor.live_migrate_instance(self.context, inst_obj,
'destination', 'block_migration', 'disk_over_commit')
else:
self.conductor.migrate_server(self.context, inst_obj,
{'host': 'destination'}, True, False, None,
'block_migration', 'disk_over_commit')
self.assertEqual('accepted', migration.status)
self.assertEqual('destination', migration.dest_compute)
self.assertEqual(inst_obj.host, migration.source_compute)
@mock.patch.object(migrate.MigrationTask, 'execute')
@mock.patch.object(utils, 'get_image_from_system_metadata')
@mock.patch.object(objects.RequestSpec, 'from_components')
@ -1871,6 +1842,31 @@ class ConductorTaskRPCAPITestCase(_BaseTaskTestCase,
service_manager = self.conductor_service.manager
self.conductor_manager = service_manager.compute_task_mgr
def test_live_migrate_instance(self):
inst = fake_instance.fake_db_instance()
inst_obj = objects.Instance._from_db_object(
self.context, objects.Instance(), inst, [])
version = '1.15'
scheduler_hint = {'host': 'destination'}
cctxt_mock = mock.MagicMock()
@mock.patch.object(self.conductor.client, 'prepare',
return_value=cctxt_mock)
def _test(prepare_mock):
self.conductor.live_migrate_instance(
self.context, inst_obj, scheduler_hint,
'block_migration', 'disk_over_commit', request_spec=None)
prepare_mock.assert_called_once_with(version=version)
kw = {'instance': inst_obj, 'scheduler_hint': scheduler_hint,
'block_migration': 'block_migration',
'disk_over_commit': 'disk_over_commit',
'request_spec': None,
}
cctxt_mock.cast.assert_called_once_with(
self.context, 'live_migrate_instance', **kw)
_test()
class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
"""Compute task API Tests."""
@ -1882,6 +1878,20 @@ class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
service_manager = self.conductor_service.manager
self.conductor_manager = service_manager.compute_task_mgr
def test_live_migrate(self):
inst = fake_instance.fake_db_instance()
inst_obj = objects.Instance._from_db_object(
self.context, objects.Instance(), inst, [])
with mock.patch.object(self.conductor.conductor_compute_rpcapi,
'migrate_server') as mock_migrate_server:
self.conductor.live_migrate_instance(self.context, inst_obj,
'destination', 'block_migration', 'disk_over_commit')
mock_migrate_server.assert_called_once_with(
self.context, inst_obj, {'host': 'destination'}, True, False,
None, 'block_migration', 'disk_over_commit', None,
request_spec=None)
class ConductorLocalComputeTaskAPITestCase(ConductorTaskAPITestCase):
"""Conductor LocalComputeTaskAPI Tests."""
@ -1889,3 +1899,26 @@ class ConductorLocalComputeTaskAPITestCase(ConductorTaskAPITestCase):
super(ConductorLocalComputeTaskAPITestCase, self).setUp()
self.conductor = conductor_api.LocalComputeTaskAPI()
self.conductor_manager = self.conductor._manager._target
@mock.patch('nova.objects.Migration')
def test_live_migrate(self, migobj):
inst = fake_instance.fake_db_instance()
inst_obj = objects.Instance._from_db_object(
self.context, objects.Instance(), inst, [])
migration = migobj()
task = mock.MagicMock()
with mock.patch.object(self.conductor_manager,
'_build_live_migrate_task',
return_value=task) as mock_build_task:
self.conductor.live_migrate_instance(self.context, inst_obj,
'destination', 'block_migration', 'disk_over_commit')
mock_build_task.assert_called_once_with(self.context, inst_obj,
'destination',
'block_migration',
'disk_over_commit',
migration, None)
task.execute.assert_called_once()
self.assertEqual('accepted', migration.status)
self.assertEqual('destination', migration.dest_compute)
self.assertEqual(inst_obj.host, migration.source_compute)