diff --git a/nova/cmd/conductor.py b/nova/cmd/conductor.py index 16847c2e5610..d4c260fb832c 100644 --- a/nova/cmd/conductor.py +++ b/nova/cmd/conductor.py @@ -26,6 +26,7 @@ import nova.conf from nova import config from nova import objects from nova import service +from nova import utils from nova import version CONF = nova.conf.CONF @@ -43,5 +44,7 @@ def main(): server = service.Service.create(binary='nova-conductor', topic=rpcapi.RPC_TOPIC) workers = CONF.conductor.workers or processutils.get_worker_count() + + utils.destroy_default_executor() service.serve(server, workers=workers) service.wait() diff --git a/nova/conductor/manager.py b/nova/conductor/manager.py index c9067a971312..f407ed4d6722 100644 --- a/nova/conductor/manager.py +++ b/nova/conductor/manager.py @@ -15,6 +15,7 @@ """Handles database requests from other nova services.""" import collections +import concurrent.futures import contextlib import copy import functools @@ -2071,8 +2072,8 @@ class ComputeTaskManager: fields.NotificationPhase.START) clock = timeutils.StopWatch() - threads = CONF.image_cache.precache_concurrency - fetch_executor = utils.create_executor(threads) + cache_image_executor = utils.get_cache_images_executor() + futures = [] hosts_by_cell = {} cells_by_uuid = {} @@ -2143,11 +2144,12 @@ class ComputeTaskManager: {'host': host}) skipped_host(target_ctxt, host, image_ids) continue - utils.spawn_on(fetch_executor, wrap_cache_images, - target_ctxt, host, image_ids) - + future = utils.spawn_on(cache_image_executor, + wrap_cache_images, + target_ctxt, host, image_ids) + futures.append(future) # Wait until all those things finish - fetch_executor.shutdown(wait=True) + concurrent.futures.wait(futures) overall_stats = {'cached': 0, 'existing': 0, 'error': 0, 'unsupported': 0} diff --git a/nova/tests/fixtures/nova.py b/nova/tests/fixtures/nova.py index fa8760d1c1a7..615b76167e17 100644 --- a/nova/tests/fixtures/nova.py +++ b/nova/tests/fixtures/nova.py @@ -1199,12 +1199,15 @@ class IsolatedExecutorFixture(fixtures.Fixture): # Just safety that the previous testcase cleaned up after itself assert utils.SCATTER_GATHER_EXECUTOR is None assert utils.DEFAULT_EXECUTOR is None + assert utils.CACHE_IMAGES_EXECUTOR is None origi_get_scatter_gather = utils.get_scatter_gather_executor origi_default_executor = utils._get_default_executor + origi_get_cache_images_executor = utils.get_cache_images_executor self.executor = None self.scatter_gather_executor = None + self.cache_images_executor = None def _get_default_executor(): self.executor = origi_default_executor() @@ -1232,11 +1235,25 @@ class IsolatedExecutorFixture(fixtures.Fixture): self.addCleanup( lambda: self.do_cleanup_executor(self.scatter_gather_executor)) + def _get_cache_images_executor(): + self.cache_images_executor = origi_get_cache_images_executor() + self.cache_images_executor.name = ( + f"{self.test_case_id}.cache_images") + return self.cache_images_executor + + self.useFixture(fixtures.MonkeyPatch( + 'nova.utils.get_cache_images_executor', + _get_cache_images_executor)) + + self.addCleanup( + lambda: self.do_cleanup_executor(self.cache_images_executor)) + self.addCleanup(self.reset_globals) def reset_globals(self): utils.SCATTER_GATHER_EXECUTOR = None utils.DEFAULT_EXECUTOR = None + utils.CACHE_IMAGES_EXECUTOR = None def do_cleanup_executor(self, executor): # NOTE(gibi): we cannot rely on utils.concurrency_mode_threading diff --git a/nova/utils.py b/nova/utils.py index db00c515bdd5..3202178917d8 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -1220,6 +1220,43 @@ def destroy_scatter_gather_executor(): SCATTER_GATHER_EXECUTOR = None +CACHE_IMAGES_EXECUTOR = None + + +def get_cache_images_executor(): + """Returns the executor used for cache images operations.""" + global CACHE_IMAGES_EXECUTOR + + if not CACHE_IMAGES_EXECUTOR: + max_workers = CONF.image_cache.precache_concurrency + CACHE_IMAGES_EXECUTOR = create_executor(max_workers) + + pname = multiprocessing.current_process().name + executor_name = f"{pname}.cache_images" + CACHE_IMAGES_EXECUTOR.name = executor_name + + LOG.info("The cache images executor %s is initialized", executor_name) + + return CACHE_IMAGES_EXECUTOR + + +def destroy_cache_images_executor(): + """Closes the executor and resets the global to None to allow forked worker + processes to properly init it. + """ + global CACHE_IMAGES_EXECUTOR + if CACHE_IMAGES_EXECUTOR: + LOG.info( + "The cache images thread pool %s is shutting down", + CACHE_IMAGES_EXECUTOR.name) + CACHE_IMAGES_EXECUTOR.shutdown() + LOG.info( + "The cache images thread pool %s is closed", + CACHE_IMAGES_EXECUTOR.name) + + CACHE_IMAGES_EXECUTOR = None + + def _log_executor_stats(executor): if CONF.thread_pool_statistic_period < 0: return