Switch nova-conductor to use global executor

Change-Id: I0700a907746712965860455909d976f6eab0d9bd
Signed-off-by: Kamil Sambor <kamil.sambor@gmail.com>
This commit is contained in:
Kamil Sambor
2025-09-29 12:09:58 +02:00
parent ec426532c3
commit f6314d9027
4 changed files with 65 additions and 6 deletions

View File

@@ -26,6 +26,7 @@ import nova.conf
from nova import config
from nova import objects
from nova import service
from nova import utils
from nova import version
CONF = nova.conf.CONF
@@ -43,5 +44,7 @@ def main():
server = service.Service.create(binary='nova-conductor',
topic=rpcapi.RPC_TOPIC)
workers = CONF.conductor.workers or processutils.get_worker_count()
utils.destroy_default_executor()
service.serve(server, workers=workers)
service.wait()

View File

@@ -15,6 +15,7 @@
"""Handles database requests from other nova services."""
import collections
import concurrent.futures
import contextlib
import copy
import functools
@@ -2071,8 +2072,8 @@ class ComputeTaskManager:
fields.NotificationPhase.START)
clock = timeutils.StopWatch()
threads = CONF.image_cache.precache_concurrency
fetch_executor = utils.create_executor(threads)
cache_image_executor = utils.get_cache_images_executor()
futures = []
hosts_by_cell = {}
cells_by_uuid = {}
@@ -2143,11 +2144,12 @@ class ComputeTaskManager:
{'host': host})
skipped_host(target_ctxt, host, image_ids)
continue
utils.spawn_on(fetch_executor, wrap_cache_images,
target_ctxt, host, image_ids)
future = utils.spawn_on(cache_image_executor,
wrap_cache_images,
target_ctxt, host, image_ids)
futures.append(future)
# Wait until all those things finish
fetch_executor.shutdown(wait=True)
concurrent.futures.wait(futures)
overall_stats = {'cached': 0, 'existing': 0, 'error': 0,
'unsupported': 0}

View File

@@ -1199,12 +1199,15 @@ class IsolatedExecutorFixture(fixtures.Fixture):
# Just safety that the previous testcase cleaned up after itself
assert utils.SCATTER_GATHER_EXECUTOR is None
assert utils.DEFAULT_EXECUTOR is None
assert utils.CACHE_IMAGES_EXECUTOR is None
origi_get_scatter_gather = utils.get_scatter_gather_executor
origi_default_executor = utils._get_default_executor
origi_get_cache_images_executor = utils.get_cache_images_executor
self.executor = None
self.scatter_gather_executor = None
self.cache_images_executor = None
def _get_default_executor():
self.executor = origi_default_executor()
@@ -1232,11 +1235,25 @@ class IsolatedExecutorFixture(fixtures.Fixture):
self.addCleanup(
lambda: self.do_cleanup_executor(self.scatter_gather_executor))
def _get_cache_images_executor():
self.cache_images_executor = origi_get_cache_images_executor()
self.cache_images_executor.name = (
f"{self.test_case_id}.cache_images")
return self.cache_images_executor
self.useFixture(fixtures.MonkeyPatch(
'nova.utils.get_cache_images_executor',
_get_cache_images_executor))
self.addCleanup(
lambda: self.do_cleanup_executor(self.cache_images_executor))
self.addCleanup(self.reset_globals)
def reset_globals(self):
utils.SCATTER_GATHER_EXECUTOR = None
utils.DEFAULT_EXECUTOR = None
utils.CACHE_IMAGES_EXECUTOR = None
def do_cleanup_executor(self, executor):
# NOTE(gibi): we cannot rely on utils.concurrency_mode_threading

View File

@@ -1220,6 +1220,43 @@ def destroy_scatter_gather_executor():
SCATTER_GATHER_EXECUTOR = None
CACHE_IMAGES_EXECUTOR = None
def get_cache_images_executor():
"""Returns the executor used for cache images operations."""
global CACHE_IMAGES_EXECUTOR
if not CACHE_IMAGES_EXECUTOR:
max_workers = CONF.image_cache.precache_concurrency
CACHE_IMAGES_EXECUTOR = create_executor(max_workers)
pname = multiprocessing.current_process().name
executor_name = f"{pname}.cache_images"
CACHE_IMAGES_EXECUTOR.name = executor_name
LOG.info("The cache images executor %s is initialized", executor_name)
return CACHE_IMAGES_EXECUTOR
def destroy_cache_images_executor():
"""Closes the executor and resets the global to None to allow forked worker
processes to properly init it.
"""
global CACHE_IMAGES_EXECUTOR
if CACHE_IMAGES_EXECUTOR:
LOG.info(
"The cache images thread pool %s is shutting down",
CACHE_IMAGES_EXECUTOR.name)
CACHE_IMAGES_EXECUTOR.shutdown()
LOG.info(
"The cache images thread pool %s is closed",
CACHE_IMAGES_EXECUTOR.name)
CACHE_IMAGES_EXECUTOR = None
def _log_executor_stats(executor):
if CONF.thread_pool_statistic_period < 0:
return