diff --git a/nova/exception.py b/nova/exception.py index a42750214593..0c049ad9adad 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -1133,6 +1133,10 @@ class NoUniqueMatch(NovaException): code = 409 +class NoActiveMigrationForInstance(NotFound): + msg_fmt = _("Active live migration for instance %(instance_id)s not found") + + class MigrationNotFound(NotFound): msg_fmt = _("Migration %(migration_id)s could not be found.") diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index f23e7c016bd4..d92e5981c3f1 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +from collections import deque import contextlib import copy import datetime @@ -7689,10 +7690,12 @@ class LibvirtConnTestCase(test.NoDBTestCase): @mock.patch.object(objects.Migration, "save") @mock.patch.object(fakelibvirt.Connection, "_mark_running") @mock.patch.object(fakelibvirt.virDomain, "abortJob") + @mock.patch.object(libvirt_driver.LibvirtDriver, "pause") def _test_live_migration_monitoring(self, job_info_records, time_records, expect_result, + mock_pause, mock_abort, mock_running, mock_save, @@ -7701,9 +7704,12 @@ class LibvirtConnTestCase(test.NoDBTestCase): mock_conn, mock_sleep, mock_time, - expected_mig_status=None): + expected_mig_status=None, + scheduled_action=None, + scheduled_action_executed=False): drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) instance = objects.Instance(**self.test_instance) + drvr.active_migrations[instance.uuid] = deque() dom = fakelibvirt.Domain(drvr._get_connection(), "", True) guest = libvirt_guest.Guest(dom) finish_event = eventlet.event.Event() @@ -7718,6 +7724,8 @@ class LibvirtConnTestCase(test.NoDBTestCase): finish_event.send() elif rec == "domain-stop": dom.destroy() + elif rec == "force_complete": + drvr.active_migrations[instance.uuid].append("pause") else: if len(time_records) > 0: time_records.pop(0) @@ -7751,7 +7759,12 @@ class LibvirtConnTestCase(test.NoDBTestCase): dom, finish_event, []) - + if scheduled_action_executed: + if scheduled_action == 'pause': + self.assertTrue(mock_pause.called) + else: + if scheduled_action == 'pause': + self.assertFalse(mock_pause.called) mock_mig_save.assert_called_with() if expect_result == self.EXPECT_SUCCESS: @@ -7777,6 +7790,7 @@ class LibvirtConnTestCase(test.NoDBTestCase): else: fake_recover_method.assert_called_once_with( self.context, instance, dest, False, migrate_data) + self.assertNotIn(instance.uuid, drvr.active_migrations) def test_live_migration_monitor_success(self): # A normal sequence where see all the normal job states @@ -7798,6 +7812,129 @@ class LibvirtConnTestCase(test.NoDBTestCase): self._test_live_migration_monitoring(domain_info_records, [], self.EXPECT_SUCCESS) + def test_live_migration_handle_pause_normal(self): + # A normal sequence where see all the normal job states, and pause + # scheduled in between VIR_DOMAIN_JOB_UNBOUNDED + domain_info_records = [ + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "force_complete", + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + "domain-stop", + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_COMPLETED), + ] + + self._test_live_migration_monitoring(domain_info_records, [], + self.EXPECT_SUCCESS, + scheduled_action="pause", + scheduled_action_executed=True) + + def test_live_migration_handle_pause_on_start(self): + # A normal sequence where see all the normal job states, and pause + # scheduled in case of job type VIR_DOMAIN_JOB_NONE and finish_event is + # not ready yet + domain_info_records = [ + "force_complete", + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + "domain-stop", + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_COMPLETED), + ] + + self._test_live_migration_monitoring(domain_info_records, [], + self.EXPECT_SUCCESS, + scheduled_action="pause", + scheduled_action_executed=True) + + def test_live_migration_handle_pause_on_finish(self): + # A normal sequence where see all the normal job states, and pause + # scheduled in case of job type VIR_DOMAIN_JOB_NONE and finish_event is + # ready + domain_info_records = [ + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + "domain-stop", + "force_complete", + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_COMPLETED), + ] + + self._test_live_migration_monitoring(domain_info_records, [], + self.EXPECT_SUCCESS, + scheduled_action="pause", + scheduled_action_executed=False) + + def test_live_migration_handle_pause_on_cancel(self): + # A normal sequence where see all the normal job states, and pause + # scheduled in case of job type VIR_DOMAIN_JOB_CANCELLED + domain_info_records = [ + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + "domain-stop", + "force_complete", + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_CANCELLED), + ] + + self._test_live_migration_monitoring(domain_info_records, [], + self.EXPECT_FAILURE, + expected_mig_status='cancelled', + scheduled_action="pause", + scheduled_action_executed=False) + + def test_live_migration_handle_pause_on_failure(self): + # A normal sequence where see all the normal job states, and pause + # scheduled in case of job type VIR_DOMAIN_JOB_FAILED + domain_info_records = [ + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + "domain-stop", + "force_complete", + host.DomainJobInfo( + type=fakelibvirt.VIR_DOMAIN_JOB_FAILED), + ] + + self._test_live_migration_monitoring(domain_info_records, [], + self.EXPECT_FAILURE, + scheduled_action="pause", + scheduled_action_executed=False) + def test_live_migration_monitor_success_race(self): # A normalish sequence but we're too slow to see the # completed job state @@ -13401,11 +13538,15 @@ class LibvirtConnTestCase(test.NoDBTestCase): lambda x: x, lambda x: x) - @mock.patch.object(libvirt_driver.LibvirtDriver, "pause") - def test_live_migration_force_complete(self, pause): + def test_live_migration_force_complete(self): drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) - drvr.live_migration_force_complete(self.test_instance) - pause.assert_called_once_with(self.test_instance) + instance = fake_instance.fake_instance_obj( + None, name='instancename', id=1, + uuid='c83a75d4-4d53-4be5-9a40-04d9c0389ff8') + drvr.active_migrations[instance.uuid] = deque() + drvr.live_migration_force_complete(instance) + self.assertEqual( + 1, drvr.active_migrations[instance.uuid].count("pause")) @mock.patch.object(host.Host, "get_connection") @mock.patch.object(fakelibvirt.virDomain, "abortJob") diff --git a/nova/tests/unit/virt/test_virt_drivers.py b/nova/tests/unit/virt/test_virt_drivers.py index 1f1222c612ff..30a06f9c73c5 100644 --- a/nova/tests/unit/virt/test_virt_drivers.py +++ b/nova/tests/unit/virt/test_virt_drivers.py @@ -13,6 +13,7 @@ # under the License. import base64 +from collections import deque import sys import traceback @@ -667,6 +668,7 @@ class _VirtDriverTestCase(_FakeDriverBackendTestCase): @catch_notimplementederror def test_live_migration_force_complete(self): instance_ref, network_info = self._get_running_instance() + self.connection.active_migrations[instance_ref.uuid] = deque() self.connection.live_migration_force_complete(instance_ref) @catch_notimplementederror diff --git a/nova/virt/fake.py b/nova/virt/fake.py index 8d94ebff1869..8aae5af447bf 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -152,6 +152,7 @@ class FakeDriver(driver.ComputeDriver): } self._mounts = {} self._interfaces = {} + self.active_migrations = {} if not _FAKE_NODES: set_nodes([CONF.host]) diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index cc08fffb20f4..8ebccc520370 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -26,6 +26,7 @@ Supports KVM, LXC, QEMU, UML, XEN and Parallels. """ import collections +from collections import deque import contextlib import errno import functools @@ -369,6 +370,7 @@ class LibvirtDriver(driver.ComputeDriver): self._remotefs = remotefs.RemoteFilesystem() self._live_migration_flags = self._block_migration_flags = 0 + self.active_migrations = {} def _get_volume_drivers(self): return libvirt_volume_drivers @@ -6034,6 +6036,7 @@ class LibvirtDriver(driver.ComputeDriver): recover_method, block_migration, migrate_data, dom, finish_event, disk_paths): + on_migration_failure = deque() data_gb = self._live_migration_data_gb(instance, disk_paths) downtime_steps = list(self._migration_downtime_steps(data_gb)) completion_timeout = int( @@ -6041,6 +6044,36 @@ class LibvirtDriver(driver.ComputeDriver): progress_timeout = CONF.libvirt.live_migration_progress_timeout migration = migrate_data.migration + def _check_scheduled_migration_task(): + tasks = self.active_migrations.get(instance.uuid, deque()) + while tasks: + task = tasks.popleft() + if task == 'pause': + try: + self.pause(instance) + on_migration_failure.append("unpause") + except Exception as e: + LOG.warning(_LW("Failed to pause instance during " + "live-migration %s"), + e, instance=instance) + + def _recover_scheduled_migration_task(): + while on_migration_failure: + task = on_migration_failure.popleft() + # NOTE(tdurakov): there is still possibility to leave + # instance paused in case of live-migration failure. + # This check guarantee that instance will be resumed + # in this case + if task == 'unpause': + try: + state = guest.get_power_state(self._host) + if state == power_state.PAUSED: + guest.resume() + except Exception as e: + LOG.warning(_LW("Failed to resume paused instance " + "before live-migration rollback %s"), + e, instance=instance) + n = 0 start = time.time() progress_time = start @@ -6100,6 +6133,7 @@ class LibvirtDriver(driver.ComputeDriver): # This is where we wire up calls to change live # migration status. eg change max downtime, cancel # the operation, change max bandwidth + _check_scheduled_migration_task() now = time.time() elapsed = now - start abort = False @@ -6128,6 +6162,7 @@ class LibvirtDriver(driver.ComputeDriver): except libvirt.libvirtError as e: LOG.warning(_LW("Failed to abort migration %s"), e, instance=instance) + self._clear_empty_migration(instance) raise # See if we need to increase the max downtime. We @@ -6217,6 +6252,7 @@ class LibvirtDriver(driver.ComputeDriver): # Migration did not succeed LOG.error(_LE("Migration operation has aborted"), instance=instance) + _recover_scheduled_migration_task() recover_method(context, instance, dest, block_migration, migrate_data) break @@ -6224,6 +6260,7 @@ class LibvirtDriver(driver.ComputeDriver): # Migration was stopped by admin LOG.warning(_LW("Migration operation was cancelled"), instance=instance) + _recover_scheduled_migration_task() recover_method(context, instance, dest, block_migration, migrate_data, migration_status='cancelled') break @@ -6232,6 +6269,14 @@ class LibvirtDriver(driver.ComputeDriver): info.type, instance=instance) time.sleep(0.5) + self._clear_empty_migration(instance) + + def _clear_empty_migration(self, instance): + try: + del self.active_migrations[instance.uuid] + except KeyError: + LOG.warning(_LW("There are no records in active migrations " + "for instance"), instance=instance) def _live_migration(self, context, instance, dest, post_method, recover_method, block_migration, @@ -6277,6 +6322,7 @@ class LibvirtDriver(driver.ComputeDriver): device_names) finish_event = eventlet.event.Event() + self.active_migrations[instance.uuid] = deque() def thread_finished(thread, event): LOG.debug("Migration operation thread notification", @@ -6306,7 +6352,11 @@ class LibvirtDriver(driver.ComputeDriver): # NOTE(pkoniszewski): currently only pause during live migration is # supported to force live migration to complete, so just try to pause # the instance - self.pause(instance) + try: + self.active_migrations[instance.uuid].append('pause') + except KeyError: + raise exception.NoActiveMigrationForInstance( + instance_id=instance.uuid) def _try_fetch_image(self, context, path, image_id, instance, fallback_from_host=None):