diff --git a/lower-constraints.txt b/lower-constraints.txt index da681ff1968a..c2a27a41dd35 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -33,6 +33,7 @@ fixtures==3.0.0 flake8==2.5.5 future==0.16.0 futurist==1.6.0 +futures==3.0.0 gabbi==1.35.0 gitdb2==2.0.3 GitPython==2.1.8 diff --git a/nova/compute/manager.py b/nova/compute/manager.py index cee21c0287f7..d8e4abca6090 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -27,6 +27,9 @@ terminating it. import base64 import binascii +# If py2, concurrent.futures comes from the futures library otherwise it +# comes from the py3 standard library. +from concurrent import futures import contextlib import functools import inspect @@ -521,10 +524,16 @@ class ComputeManager(manager.Manager): else: self._build_semaphore = compute_utils.UnlimitedSemaphore() if max(CONF.max_concurrent_live_migrations, 0) != 0: - self._live_migration_semaphore = eventlet.semaphore.Semaphore( - CONF.max_concurrent_live_migrations) + self._live_migration_executor = futures.ThreadPoolExecutor( + max_workers=CONF.max_concurrent_live_migrations) else: - self._live_migration_semaphore = compute_utils.UnlimitedSemaphore() + # Starting in python 3.5, this is technically bounded, but it's + # ncpu * 5 which is probably much higher than anyone would sanely + # use for concurrently running live migrations. + self._live_migration_executor = futures.ThreadPoolExecutor() + # This is a dict, keyed by migration uuid, to a two-item tuple of + # migration object and Future for the queued live migration. + self._waiting_live_migrations = {} super(ComputeManager, self).__init__(service_name="compute", *args, **kwargs) @@ -1152,6 +1161,26 @@ class ComputeManager(manager.Manager): self.driver.register_event_listener(None) self.instance_events.cancel_all_events() self.driver.cleanup_host(host=self.host) + self._cleanup_live_migrations_in_pool() + + def _cleanup_live_migrations_in_pool(self): + # Shutdown the pool so we don't get new requests. + self._live_migration_executor.shutdown(wait=False) + # For any queued migrations, cancel the migration and update + # its status. + for migration, future in self._waiting_live_migrations.values(): + # If we got here before the Future was submitted then we need + # to move on since there isn't anything we can do. + if future is None: + continue + if future.cancel(): + self._set_migration_status(migration, 'cancelled') + LOG.info('Successfully cancelled queued live migration.', + instance_uuid=migration.instance_uuid) + else: + LOG.warning('Unable to cancel live migration.', + instance_uuid=migration.instance_uuid) + self._waiting_live_migrations.clear() def pre_start_hook(self): """After the service is initialized, but before we fully bring @@ -6146,6 +6175,9 @@ class ComputeManager(manager.Manager): # done on source/destination. For now, this is just here for status # reporting self._set_migration_status(migration, 'preparing') + # NOTE(Kevin_Zheng): The migration is no longer in the `queued` status + # so lets remove it from the mapping. + self._waiting_live_migrations.pop(instance.uuid) class _BreakWaitForInstanceEvent(Exception): """Used as a signal to stop waiting for the network-vif-plugged @@ -6257,18 +6289,23 @@ class ComputeManager(manager.Manager): """ self._set_migration_status(migration, 'queued') - - def dispatch_live_migration(*args, **kwargs): - with self._live_migration_semaphore: - self._do_live_migration(*args, **kwargs) - - # NOTE(danms): We spawn here to return the RPC worker thread back to - # the pool. Since what follows could take a really long time, we don't - # want to tie up RPC workers. - utils.spawn_n(dispatch_live_migration, - context, dest, instance, - block_migration, migration, - migrate_data) + # NOTE(Kevin_Zheng): Submit the live_migration job to the pool and + # put the returned Future object into dict mapped with migration.uuid + # in order to be able to track and abort it in the future. + self._waiting_live_migrations[instance.uuid] = (None, None) + try: + future = self._live_migration_executor.submit( + self._do_live_migration, context, dest, instance, + block_migration, migration, migrate_data) + self._waiting_live_migrations[instance.uuid] = (migration, future) + except RuntimeError: + # ThreadPoolExecutor.submit will raise RuntimeError if the pool + # is shutdown, which happens in _cleanup_live_migrations_in_pool. + LOG.info('Migration %s failed to submit as the compute service ' + 'is shutting down.', migration.uuid, instance=instance) + self._set_migration_status(migration, 'error') + raise exception.LiveMigrationNotSubmitted( + migration_uuid=migration.uuid, instance_uuid=instance.uuid) @wrap_exception() @wrap_instance_event(prefix='compute') diff --git a/nova/exception.py b/nova/exception.py index 5fb0b71db958..a0df4a1a0012 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -1827,6 +1827,11 @@ class InvalidWatchdogAction(Invalid): msg_fmt = _("Provided watchdog action (%(action)s) is not supported.") +class LiveMigrationNotSubmitted(NovaException): + msg_fmt = _("Failed to submit live migration %(migration_uuid)s for " + "instance %(instance_uuid)s for processing.") + + class SelectionObjectsWithOldRPCVersionNotSupported(NovaException): msg_fmt = _("Requests for Selection objects with alternates are not " "supported in select_destinations() before RPC version 4.5; " diff --git a/nova/tests/fixtures.py b/nova/tests/fixtures.py index 67fe98ca4791..7db87df5d3f3 100644 --- a/nova/tests/fixtures.py +++ b/nova/tests/fixtures.py @@ -1010,6 +1010,26 @@ class SpawnIsSynchronousFixture(fixtures.Fixture): 'nova.utils.spawn', _FakeGreenThread)) +class SynchronousThreadPoolExecutorFixture(fixtures.Fixture): + """Make ThreadPoolExecutor.submit() synchronous. + + The function passed to submit() will be executed and a mock.Mock + object will be returned as the Future where Future.result() will + return the result of the call to the submitted function. + """ + def setUp(self): + super(SynchronousThreadPoolExecutorFixture, self).setUp() + + def fake_submit(_self, fn, *args, **kwargs): + result = fn(*args, **kwargs) + future = mock.Mock(spec='concurrent.futures.Future') + future.return_value.result.return_value = result + return future + self.useFixture(fixtures.MonkeyPatch( + 'concurrent.futures.ThreadPoolExecutor.submit', + fake_submit)) + + class BannedDBSchemaOperations(fixtures.Fixture): """Ban some operations for migrations""" def __init__(self, banned_resources=None): diff --git a/nova/tests/unit/compute/test_compute.py b/nova/tests/unit/compute/test_compute.py index 0fb2fb08f7a1..f6e79ea0c1e3 100644 --- a/nova/tests/unit/compute/test_compute.py +++ b/nova/tests/unit/compute/test_compute.py @@ -1490,6 +1490,7 @@ class ComputeTestCase(BaseTestCase, def setUp(self): super(ComputeTestCase, self).setUp() self.useFixture(fixtures.SpawnIsSynchronousFixture()) + self.useFixture(fixtures.SynchronousThreadPoolExecutorFixture()) self.image_api = image_api.API() @@ -6348,7 +6349,7 @@ class ComputeTestCase(BaseTestCase, mock_pre.return_value = migrate_data # start test - migration = objects.Migration() + migration = objects.Migration(uuid=uuids.migration) with mock.patch.object(self.compute.driver, 'cleanup') as mock_cleanup: mock_cleanup.side_effect = test.TestingException diff --git a/nova/tests/unit/compute/test_compute_mgr.py b/nova/tests/unit/compute/test_compute_mgr.py index 15cc7f2156ac..ef34d2349502 100644 --- a/nova/tests/unit/compute/test_compute_mgr.py +++ b/nova/tests/unit/compute/test_compute_mgr.py @@ -697,6 +697,33 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase): mock.call(self.compute.handle_events), mock.call(None)]) mock_driver.cleanup_host.assert_called_once_with(host='fake-mini') + def test_cleanup_live_migrations_in_pool_with_record(self): + fake_future = mock.MagicMock() + fake_instance_uuid = uuids.instance + fake_migration = objects.Migration( + uuid=uuids.migration, instance_uuid=fake_instance_uuid) + fake_migration.save = mock.MagicMock() + self.compute._waiting_live_migrations[fake_instance_uuid] = ( + fake_migration, fake_future) + + with mock.patch.object(self.compute, '_live_migration_executor' + ) as mock_migration_pool: + self.compute._cleanup_live_migrations_in_pool() + + mock_migration_pool.shutdown.assert_called_once_with(wait=False) + self.assertEqual('cancelled', fake_migration.status) + fake_future.cancel.assert_called_once_with() + self.assertEqual({}, self.compute._waiting_live_migrations) + + # test again with Future is None + self.compute._waiting_live_migrations[fake_instance_uuid] = ( + None, None) + self.compute._cleanup_live_migrations_in_pool() + + mock_migration_pool.shutdown.assert_called_with(wait=False) + self.assertEqual(2, mock_migration_pool.shutdown.call_count) + self.assertEqual({}, self.compute._waiting_live_migrations) + def test_init_virt_events_disabled(self): self.flags(handle_virt_lifecycle_events=False, group='workarounds') with mock.patch.object(self.compute.driver, @@ -6238,6 +6265,7 @@ class ComputeManagerErrorsOutMigrationTestCase(test.NoDBTestCase): mock_obj_as_admin.assert_called_once_with() +@ddt.ddt class ComputeManagerMigrationTestCase(test.NoDBTestCase): class TestResizeError(Exception): pass @@ -6895,7 +6923,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): @mock.patch('nova.objects.Migration.save') def _do_it(mock_mig_save): instance = objects.Instance(uuid=uuids.fake) - migration = objects.Migration() + migration = objects.Migration(uuid=uuids.migration) self.compute.live_migration(self.context, mock.sentinel.dest, instance, @@ -6906,10 +6934,10 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): migration.save.assert_called_once_with() with mock.patch.object(self.compute, - '_live_migration_semaphore') as mock_sem: + '_live_migration_executor') as mock_exc: for i in (1, 2, 3): _do_it() - self.assertEqual(3, mock_sem.__enter__.call_count) + self.assertEqual(3, mock_exc.submit.call_count) def test_max_concurrent_live_limited(self): self.flags(max_concurrent_live_migrations=2) @@ -6919,25 +6947,19 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): self.flags(max_concurrent_live_migrations=0) self._test_max_concurrent_live() - def test_max_concurrent_live_semaphore_limited(self): + @mock.patch('concurrent.futures.ThreadPoolExecutor') + def test_max_concurrent_live_semaphore_limited(self, mock_executor): self.flags(max_concurrent_live_migrations=123) - self.assertEqual( - 123, - manager.ComputeManager()._live_migration_semaphore.balance) + manager.ComputeManager() + mock_executor.assert_called_once_with(max_workers=123) - def test_max_concurrent_live_semaphore_unlimited(self): - self.flags(max_concurrent_live_migrations=0) - compute = manager.ComputeManager() - self.assertEqual(0, compute._live_migration_semaphore.balance) - self.assertIsInstance(compute._live_migration_semaphore, - compute_utils.UnlimitedSemaphore) - - def test_max_concurrent_live_semaphore_negative(self): - self.flags(max_concurrent_live_migrations=-2) - compute = manager.ComputeManager() - self.assertEqual(0, compute._live_migration_semaphore.balance) - self.assertIsInstance(compute._live_migration_semaphore, - compute_utils.UnlimitedSemaphore) + @ddt.data(0, -2) + def test_max_concurrent_live_semaphore_unlimited(self, max_concurrent): + self.flags(max_concurrent_live_migrations=max_concurrent) + with mock.patch( + 'concurrent.futures.ThreadPoolExecutor') as mock_executor: + manager.ComputeManager() + mock_executor.assert_called_once_with() def test_pre_live_migration_cinder_v3_api(self): # This tests that pre_live_migration with a bdm with an @@ -7109,6 +7131,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): network_info=network_model.NetworkInfo([ network_model.VIF(uuids.port1), network_model.VIF(uuids.port2) ])) + self.compute._waiting_live_migrations[self.instance.uuid] = ( + self.migration, mock.MagicMock() + ) with mock.patch.object(self.compute.virtapi, 'wait_for_instance_event') as wait_for_event: self.compute._do_live_migration( @@ -7136,6 +7161,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): self.instance.info_cache = objects.InstanceInfoCache( network_info=network_model.NetworkInfo([ network_model.VIF(uuids.port1)])) + self.compute._waiting_live_migrations[self.instance.uuid] = ( + self.migration, mock.MagicMock() + ) with mock.patch.object( self.compute.virtapi, 'wait_for_instance_event'): self.compute._do_live_migration( @@ -7160,6 +7188,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): self.instance.info_cache = objects.InstanceInfoCache( network_info=network_model.NetworkInfo([ network_model.VIF(uuids.port1)])) + self.compute._waiting_live_migrations[self.instance.uuid] = ( + self.migration, mock.MagicMock() + ) with mock.patch.object( self.compute.virtapi, 'wait_for_instance_event') as wait_for_event: @@ -7187,6 +7218,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): self.instance.info_cache = objects.InstanceInfoCache( network_info=network_model.NetworkInfo([ network_model.VIF(uuids.port1)])) + self.compute._waiting_live_migrations[self.instance.uuid] = ( + self.migration, mock.MagicMock() + ) with mock.patch.object( self.compute.virtapi, 'wait_for_instance_event') as wait_for_event: @@ -7218,6 +7252,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): self.instance.info_cache = objects.InstanceInfoCache( network_info=network_model.NetworkInfo([ network_model.VIF(uuids.port1)])) + self.compute._waiting_live_migrations[self.instance.uuid] = ( + self.migration, mock.MagicMock() + ) with mock.patch.object( self.compute.virtapi, 'wait_for_instance_event') as wait_for_event: @@ -7229,6 +7266,19 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): self.assertEqual('running', self.migration.status) mock_rollback_live_mig.assert_not_called() + @mock.patch.object(compute_utils, 'add_instance_fault_from_exc') + @mock.patch('nova.compute.utils.notify_about_instance_action') + def test_live_migration_submit_failed(self, mock_notify, mock_exc): + migration = objects.Migration(uuid=uuids.migration) + migration.save = mock.MagicMock() + with mock.patch.object( + self.compute._live_migration_executor, 'submit') as mock_sub: + mock_sub.side_effect = RuntimeError + self.assertRaises(exception.LiveMigrationNotSubmitted, + self.compute.live_migration, self.context, + 'fake', self.instance, True, migration, {}) + self.assertEqual('error', migration.status) + def test_live_migration_force_complete_succeeded(self): migration = objects.Migration() migration.status = 'running' diff --git a/requirements.txt b/requirements.txt index 2e1f9246a8ab..5b48ebb5fefa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -67,3 +67,4 @@ os-service-types>=1.2.0 # Apache-2.0 taskflow>=2.16.0 # Apache-2.0 python-dateutil>=2.5.3 # BSD zVMCloudConnector>=1.1.1;sys_platform!='win32' # Apache 2.0 License +futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # PSF