Switch nova-conductor to use ThreadPoolExecutor

This is a pure refactor so not having any unit test change actually
signals that the refactor did not change the existing behavior which is
good.

The unit test run on this patch only covers the eventlet mode but higher
in the series we run unit test with native threading mode in a separate
job that will complement the coverage for this patch.

Change-Id: Iafc96c93a0d4c406b77902942b2940653441fe38
Signed-off-by: Kamil Sambor <kamil.sambor@gmail.com>
This commit is contained in:
Kamil Sambor
2025-08-07 13:54:13 +02:00
parent 640782207c
commit 9f58f596db
3 changed files with 37 additions and 33 deletions

View File

@@ -19,9 +19,9 @@ import contextlib
import copy import copy
import functools import functools
import sys import sys
import threading
import typing as ty import typing as ty
import futurist
from keystoneauth1 import exceptions as ks_exc from keystoneauth1 import exceptions as ks_exc
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
@@ -2062,7 +2062,7 @@ class ComputeTaskManager:
the host list the host list
:param image_id: The IDs of the image to cache :param image_id: The IDs of the image to cache
""" """
local_lock = threading.Lock()
# TODO(mriedem): Consider including the list of images in the # TODO(mriedem): Consider including the list of images in the
# notification payload. # notification payload.
compute_utils.notify_about_aggregate_action( compute_utils.notify_about_aggregate_action(
@@ -2072,7 +2072,7 @@ class ComputeTaskManager:
clock = timeutils.StopWatch() clock = timeutils.StopWatch()
threads = CONF.image_cache.precache_concurrency threads = CONF.image_cache.precache_concurrency
fetch_executor = futurist.GreenThreadPoolExecutor(max_workers=threads) fetch_executor = utils.create_executor(threads)
hosts_by_cell = {} hosts_by_cell = {}
cells_by_uuid = {} cells_by_uuid = {}
@@ -2099,24 +2099,24 @@ class ComputeTaskManager:
} }
def host_completed(context, host, result): def host_completed(context, host, result):
for image_id, status in result.items(): with local_lock:
cached, existing, error, unsupported = stats[image_id] for image_id, status in result.items():
if status == 'error': cached, existing, error, unsupported = stats[image_id]
failed_images[image_id] += 1 if status == 'error':
error += 1 failed_images[image_id] += 1
elif status == 'cached': error += 1
cached += 1 elif status == 'cached':
elif status == 'existing': cached += 1
existing += 1 elif status == 'existing':
elif status == 'unsupported': existing += 1
unsupported += 1 elif status == 'unsupported':
stats[image_id] = (cached, existing, error, unsupported) unsupported += 1
stats[image_id] = (cached, existing, error, unsupported)
host_stats['completed'] += 1 host_stats['completed'] += 1
compute_utils.notify_about_aggregate_cache(context, aggregate, compute_utils.notify_about_aggregate_cache(
host, result, context, aggregate, host, result,
host_stats['completed'], host_stats['completed'], host_stats['total'])
host_stats['total'])
def wrap_cache_images(ctxt, host, image_ids): def wrap_cache_images(ctxt, host, image_ids):
result = self.compute_rpcapi.cache_images( result = self.compute_rpcapi.cache_images(

View File

@@ -104,19 +104,23 @@ def destroy_default_executor():
DEFAULT_EXECUTOR = None DEFAULT_EXECUTOR = None
def create_executor(max_workers):
if concurrency_mode_threading():
executor = futurist.ThreadPoolExecutor(max_workers)
else:
executor = futurist.GreenThreadPoolExecutor(max_workers)
return executor
def _get_default_executor(): def _get_default_executor():
global DEFAULT_EXECUTOR global DEFAULT_EXECUTOR
if not DEFAULT_EXECUTOR: if not DEFAULT_EXECUTOR:
if concurrency_mode_threading(): max_workers = (
DEFAULT_EXECUTOR = futurist.ThreadPoolExecutor( CONF.default_thread_pool_size if concurrency_mode_threading()
CONF.default_thread_pool_size else CONF.default_green_pool_size
) )
else: DEFAULT_EXECUTOR = create_executor(max_workers)
DEFAULT_EXECUTOR = futurist.GreenThreadPoolExecutor(
CONF.default_green_pool_size
)
pname = multiprocessing.current_process().name pname = multiprocessing.current_process().name
executor_name = f"{pname}.default" executor_name = f"{pname}.default"
DEFAULT_EXECUTOR.name = executor_name DEFAULT_EXECUTOR.name = executor_name
@@ -1183,11 +1187,11 @@ def get_scatter_gather_executor():
global SCATTER_GATHER_EXECUTOR global SCATTER_GATHER_EXECUTOR
if not SCATTER_GATHER_EXECUTOR: if not SCATTER_GATHER_EXECUTOR:
if concurrency_mode_threading(): max_workers = (
SCATTER_GATHER_EXECUTOR = futurist.ThreadPoolExecutor( CONF.cell_worker_thread_pool_size
CONF.cell_worker_thread_pool_size) if concurrency_mode_threading() else 1000
else: )
SCATTER_GATHER_EXECUTOR = futurist.GreenThreadPoolExecutor() SCATTER_GATHER_EXECUTOR = create_executor(max_workers)
pname = multiprocessing.current_process().name pname = multiprocessing.current_process().name
executor_name = f"{pname}.cell_worker" executor_name = f"{pname}.cell_worker"

View File

@@ -60,6 +60,6 @@ cursive>=0.2.1 # Apache-2.0
retrying>=1.3.3 # Apache-2.0 retrying>=1.3.3 # Apache-2.0
os-service-types>=1.7.0 # Apache-2.0 os-service-types>=1.7.0 # Apache-2.0
python-dateutil>=2.7.0 # BSD python-dateutil>=2.7.0 # BSD
futurist>=1.8.0 # Apache-2.0 futurist>=3.2.1 # Apache-2.0
openstacksdk>=4.4.0 # Apache-2.0 openstacksdk>=4.4.0 # Apache-2.0
PyYAML>=5.1 # MIT PyYAML>=5.1 # MIT