Use ThreadPoolExecutor for max_concurrent_live_migrations

This changes the max_concurrent_live_migrations handling
to use a ThreadPoolExecutor so that we can control a bounded
pool of Futures in order to cancel queued live migrations
later in this series.

There is a slight functional difference in the unlimited
case since starting in python 3.5, ThreadPoolExecutor will
default to ncpu * 5 concurrently running threads. However,
max_concurrent_live_migrations defaults to 1 and assuming
compute hosts run with 32 physical CPUs on average, you'd
be looking at a maximum of 160 concurrently running live
migrations, which is probably way above what anyone would
consider sane.

Co-Authored-By: Matt Riedemann <mriedem.os@gmail.com>

Part of blueprint abort-live-migration-in-queued-status

Change-Id: Ia9ea1e164fb3b4a386405538eed58d94ad115172
This commit is contained in:
Kevin_Zheng 2018-04-23 15:41:21 +08:00 committed by Matt Riedemann
parent b2dafb1ee8
commit 79dac41fee
7 changed files with 151 additions and 36 deletions

View File

@ -33,6 +33,7 @@ fixtures==3.0.0
flake8==2.5.5
future==0.16.0
futurist==1.6.0
futures==3.0.0
gabbi==1.35.0
gitdb2==2.0.3
GitPython==2.1.8

View File

@ -27,6 +27,9 @@ terminating it.
import base64
import binascii
# If py2, concurrent.futures comes from the futures library otherwise it
# comes from the py3 standard library.
from concurrent import futures
import contextlib
import functools
import inspect
@ -521,10 +524,16 @@ class ComputeManager(manager.Manager):
else:
self._build_semaphore = compute_utils.UnlimitedSemaphore()
if max(CONF.max_concurrent_live_migrations, 0) != 0:
self._live_migration_semaphore = eventlet.semaphore.Semaphore(
CONF.max_concurrent_live_migrations)
self._live_migration_executor = futures.ThreadPoolExecutor(
max_workers=CONF.max_concurrent_live_migrations)
else:
self._live_migration_semaphore = compute_utils.UnlimitedSemaphore()
# Starting in python 3.5, this is technically bounded, but it's
# ncpu * 5 which is probably much higher than anyone would sanely
# use for concurrently running live migrations.
self._live_migration_executor = futures.ThreadPoolExecutor()
# This is a dict, keyed by migration uuid, to a two-item tuple of
# migration object and Future for the queued live migration.
self._waiting_live_migrations = {}
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
@ -1152,6 +1161,26 @@ class ComputeManager(manager.Manager):
self.driver.register_event_listener(None)
self.instance_events.cancel_all_events()
self.driver.cleanup_host(host=self.host)
self._cleanup_live_migrations_in_pool()
def _cleanup_live_migrations_in_pool(self):
# Shutdown the pool so we don't get new requests.
self._live_migration_executor.shutdown(wait=False)
# For any queued migrations, cancel the migration and update
# its status.
for migration, future in self._waiting_live_migrations.values():
# If we got here before the Future was submitted then we need
# to move on since there isn't anything we can do.
if future is None:
continue
if future.cancel():
self._set_migration_status(migration, 'cancelled')
LOG.info('Successfully cancelled queued live migration.',
instance_uuid=migration.instance_uuid)
else:
LOG.warning('Unable to cancel live migration.',
instance_uuid=migration.instance_uuid)
self._waiting_live_migrations.clear()
def pre_start_hook(self):
"""After the service is initialized, but before we fully bring
@ -6146,6 +6175,9 @@ class ComputeManager(manager.Manager):
# done on source/destination. For now, this is just here for status
# reporting
self._set_migration_status(migration, 'preparing')
# NOTE(Kevin_Zheng): The migration is no longer in the `queued` status
# so lets remove it from the mapping.
self._waiting_live_migrations.pop(instance.uuid)
class _BreakWaitForInstanceEvent(Exception):
"""Used as a signal to stop waiting for the network-vif-plugged
@ -6257,18 +6289,23 @@ class ComputeManager(manager.Manager):
"""
self._set_migration_status(migration, 'queued')
def dispatch_live_migration(*args, **kwargs):
with self._live_migration_semaphore:
self._do_live_migration(*args, **kwargs)
# NOTE(danms): We spawn here to return the RPC worker thread back to
# the pool. Since what follows could take a really long time, we don't
# want to tie up RPC workers.
utils.spawn_n(dispatch_live_migration,
context, dest, instance,
block_migration, migration,
migrate_data)
# NOTE(Kevin_Zheng): Submit the live_migration job to the pool and
# put the returned Future object into dict mapped with migration.uuid
# in order to be able to track and abort it in the future.
self._waiting_live_migrations[instance.uuid] = (None, None)
try:
future = self._live_migration_executor.submit(
self._do_live_migration, context, dest, instance,
block_migration, migration, migrate_data)
self._waiting_live_migrations[instance.uuid] = (migration, future)
except RuntimeError:
# ThreadPoolExecutor.submit will raise RuntimeError if the pool
# is shutdown, which happens in _cleanup_live_migrations_in_pool.
LOG.info('Migration %s failed to submit as the compute service '
'is shutting down.', migration.uuid, instance=instance)
self._set_migration_status(migration, 'error')
raise exception.LiveMigrationNotSubmitted(
migration_uuid=migration.uuid, instance_uuid=instance.uuid)
@wrap_exception()
@wrap_instance_event(prefix='compute')

View File

@ -1827,6 +1827,11 @@ class InvalidWatchdogAction(Invalid):
msg_fmt = _("Provided watchdog action (%(action)s) is not supported.")
class LiveMigrationNotSubmitted(NovaException):
msg_fmt = _("Failed to submit live migration %(migration_uuid)s for "
"instance %(instance_uuid)s for processing.")
class SelectionObjectsWithOldRPCVersionNotSupported(NovaException):
msg_fmt = _("Requests for Selection objects with alternates are not "
"supported in select_destinations() before RPC version 4.5; "

View File

@ -1010,6 +1010,26 @@ class SpawnIsSynchronousFixture(fixtures.Fixture):
'nova.utils.spawn', _FakeGreenThread))
class SynchronousThreadPoolExecutorFixture(fixtures.Fixture):
"""Make ThreadPoolExecutor.submit() synchronous.
The function passed to submit() will be executed and a mock.Mock
object will be returned as the Future where Future.result() will
return the result of the call to the submitted function.
"""
def setUp(self):
super(SynchronousThreadPoolExecutorFixture, self).setUp()
def fake_submit(_self, fn, *args, **kwargs):
result = fn(*args, **kwargs)
future = mock.Mock(spec='concurrent.futures.Future')
future.return_value.result.return_value = result
return future
self.useFixture(fixtures.MonkeyPatch(
'concurrent.futures.ThreadPoolExecutor.submit',
fake_submit))
class BannedDBSchemaOperations(fixtures.Fixture):
"""Ban some operations for migrations"""
def __init__(self, banned_resources=None):

View File

@ -1490,6 +1490,7 @@ class ComputeTestCase(BaseTestCase,
def setUp(self):
super(ComputeTestCase, self).setUp()
self.useFixture(fixtures.SpawnIsSynchronousFixture())
self.useFixture(fixtures.SynchronousThreadPoolExecutorFixture())
self.image_api = image_api.API()
@ -6348,7 +6349,7 @@ class ComputeTestCase(BaseTestCase,
mock_pre.return_value = migrate_data
# start test
migration = objects.Migration()
migration = objects.Migration(uuid=uuids.migration)
with mock.patch.object(self.compute.driver,
'cleanup') as mock_cleanup:
mock_cleanup.side_effect = test.TestingException

View File

@ -697,6 +697,33 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
mock.call(self.compute.handle_events), mock.call(None)])
mock_driver.cleanup_host.assert_called_once_with(host='fake-mini')
def test_cleanup_live_migrations_in_pool_with_record(self):
fake_future = mock.MagicMock()
fake_instance_uuid = uuids.instance
fake_migration = objects.Migration(
uuid=uuids.migration, instance_uuid=fake_instance_uuid)
fake_migration.save = mock.MagicMock()
self.compute._waiting_live_migrations[fake_instance_uuid] = (
fake_migration, fake_future)
with mock.patch.object(self.compute, '_live_migration_executor'
) as mock_migration_pool:
self.compute._cleanup_live_migrations_in_pool()
mock_migration_pool.shutdown.assert_called_once_with(wait=False)
self.assertEqual('cancelled', fake_migration.status)
fake_future.cancel.assert_called_once_with()
self.assertEqual({}, self.compute._waiting_live_migrations)
# test again with Future is None
self.compute._waiting_live_migrations[fake_instance_uuid] = (
None, None)
self.compute._cleanup_live_migrations_in_pool()
mock_migration_pool.shutdown.assert_called_with(wait=False)
self.assertEqual(2, mock_migration_pool.shutdown.call_count)
self.assertEqual({}, self.compute._waiting_live_migrations)
def test_init_virt_events_disabled(self):
self.flags(handle_virt_lifecycle_events=False, group='workarounds')
with mock.patch.object(self.compute.driver,
@ -6238,6 +6265,7 @@ class ComputeManagerErrorsOutMigrationTestCase(test.NoDBTestCase):
mock_obj_as_admin.assert_called_once_with()
@ddt.ddt
class ComputeManagerMigrationTestCase(test.NoDBTestCase):
class TestResizeError(Exception):
pass
@ -6895,7 +6923,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
@mock.patch('nova.objects.Migration.save')
def _do_it(mock_mig_save):
instance = objects.Instance(uuid=uuids.fake)
migration = objects.Migration()
migration = objects.Migration(uuid=uuids.migration)
self.compute.live_migration(self.context,
mock.sentinel.dest,
instance,
@ -6906,10 +6934,10 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
migration.save.assert_called_once_with()
with mock.patch.object(self.compute,
'_live_migration_semaphore') as mock_sem:
'_live_migration_executor') as mock_exc:
for i in (1, 2, 3):
_do_it()
self.assertEqual(3, mock_sem.__enter__.call_count)
self.assertEqual(3, mock_exc.submit.call_count)
def test_max_concurrent_live_limited(self):
self.flags(max_concurrent_live_migrations=2)
@ -6919,25 +6947,19 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.flags(max_concurrent_live_migrations=0)
self._test_max_concurrent_live()
def test_max_concurrent_live_semaphore_limited(self):
@mock.patch('concurrent.futures.ThreadPoolExecutor')
def test_max_concurrent_live_semaphore_limited(self, mock_executor):
self.flags(max_concurrent_live_migrations=123)
self.assertEqual(
123,
manager.ComputeManager()._live_migration_semaphore.balance)
manager.ComputeManager()
mock_executor.assert_called_once_with(max_workers=123)
def test_max_concurrent_live_semaphore_unlimited(self):
self.flags(max_concurrent_live_migrations=0)
compute = manager.ComputeManager()
self.assertEqual(0, compute._live_migration_semaphore.balance)
self.assertIsInstance(compute._live_migration_semaphore,
compute_utils.UnlimitedSemaphore)
def test_max_concurrent_live_semaphore_negative(self):
self.flags(max_concurrent_live_migrations=-2)
compute = manager.ComputeManager()
self.assertEqual(0, compute._live_migration_semaphore.balance)
self.assertIsInstance(compute._live_migration_semaphore,
compute_utils.UnlimitedSemaphore)
@ddt.data(0, -2)
def test_max_concurrent_live_semaphore_unlimited(self, max_concurrent):
self.flags(max_concurrent_live_migrations=max_concurrent)
with mock.patch(
'concurrent.futures.ThreadPoolExecutor') as mock_executor:
manager.ComputeManager()
mock_executor.assert_called_once_with()
def test_pre_live_migration_cinder_v3_api(self):
# This tests that pre_live_migration with a bdm with an
@ -7109,6 +7131,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1), network_model.VIF(uuids.port2)
]))
self.compute._waiting_live_migrations[self.instance.uuid] = (
self.migration, mock.MagicMock()
)
with mock.patch.object(self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
self.compute._do_live_migration(
@ -7136,6 +7161,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.instance.info_cache = objects.InstanceInfoCache(
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1)]))
self.compute._waiting_live_migrations[self.instance.uuid] = (
self.migration, mock.MagicMock()
)
with mock.patch.object(
self.compute.virtapi, 'wait_for_instance_event'):
self.compute._do_live_migration(
@ -7160,6 +7188,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.instance.info_cache = objects.InstanceInfoCache(
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1)]))
self.compute._waiting_live_migrations[self.instance.uuid] = (
self.migration, mock.MagicMock()
)
with mock.patch.object(
self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
@ -7187,6 +7218,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.instance.info_cache = objects.InstanceInfoCache(
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1)]))
self.compute._waiting_live_migrations[self.instance.uuid] = (
self.migration, mock.MagicMock()
)
with mock.patch.object(
self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
@ -7218,6 +7252,9 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.instance.info_cache = objects.InstanceInfoCache(
network_info=network_model.NetworkInfo([
network_model.VIF(uuids.port1)]))
self.compute._waiting_live_migrations[self.instance.uuid] = (
self.migration, mock.MagicMock()
)
with mock.patch.object(
self.compute.virtapi,
'wait_for_instance_event') as wait_for_event:
@ -7229,6 +7266,19 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.assertEqual('running', self.migration.status)
mock_rollback_live_mig.assert_not_called()
@mock.patch.object(compute_utils, 'add_instance_fault_from_exc')
@mock.patch('nova.compute.utils.notify_about_instance_action')
def test_live_migration_submit_failed(self, mock_notify, mock_exc):
migration = objects.Migration(uuid=uuids.migration)
migration.save = mock.MagicMock()
with mock.patch.object(
self.compute._live_migration_executor, 'submit') as mock_sub:
mock_sub.side_effect = RuntimeError
self.assertRaises(exception.LiveMigrationNotSubmitted,
self.compute.live_migration, self.context,
'fake', self.instance, True, migration, {})
self.assertEqual('error', migration.status)
def test_live_migration_force_complete_succeeded(self):
migration = objects.Migration()
migration.status = 'running'

View File

@ -67,3 +67,4 @@ os-service-types>=1.2.0 # Apache-2.0
taskflow>=2.16.0 # Apache-2.0
python-dateutil>=2.5.3 # BSD
zVMCloudConnector>=1.1.1;sys_platform!='win32' # Apache 2.0 License
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # PSF