Merge "Add ThreadPool statistics debug log"

This commit is contained in:
Zuul
2025-10-23 23:52:42 +00:00
committed by Gerrit Code Review
7 changed files with 104 additions and 5 deletions

View File

@@ -62,6 +62,8 @@
devstack_local_conf:
post-config:
$WATCHER_CONF:
DEFAULT:
print_thread_pool_stats: true
watcher_cluster_data_model_collectors.compute:
period: 120
watcher_cluster_data_model_collectors.baremetal:
@@ -198,6 +200,8 @@
devstack_local_conf:
post-config:
$WATCHER_CONF:
DEFAULT:
print_thread_pool_stats: true
watcher_datasources:
datasources: prometheus
prometheus_client:

View File

@@ -13,11 +13,16 @@
# under the License.
from apscheduler.executors import pool as pool_executor
import futurist
from oslo_config import cfg
from oslo_log import log
from watcher import eventlet as eventlet_helper
LOG = log.getLogger(__name__)
CONF = cfg.CONF
def get_futurist_pool_executor(max_workers=10):
"""Returns a futurist pool executor
@@ -33,6 +38,38 @@ def get_futurist_pool_executor(max_workers=10):
return futurist.ThreadPoolExecutor(max_workers)
def log_executor_stats(executor, name="unknown"):
"""Log the statistics of the executor.
This is usually called before submitting a new task.
"""
if not CONF.print_thread_pool_stats:
return
stats: futurist.ExecutorStatistics = executor.statistics
try:
if isinstance(executor, futurist.ThreadPoolExecutor):
LOG.debug(
f"State of {name} ThreadPoolExecutor when submitting a new "
f"task: max_workers: {executor._max_workers:d}, "
f"workers: {len(executor._workers):d}, "
"idle workers: "
f"{len([w for w in executor._workers if w.idle]):d}, "
f"queued work: {executor._work_queue.qsize():d}, "
f"stats: {stats}")
elif isinstance(executor, futurist.GreenThreadPoolExecutor):
LOG.debug(
f"State of {name} GreenThreadPoolExecutor when submitting a "
"new task: "
f"workers: {len(executor._pool.coroutines_running):d}, "
f"max_workers: {executor._pool.size:d}, "
f"work queued length: "
f"{executor._delayed_work.unfinished_tasks:d}, "
f"stats: {stats}")
except Exception as e:
LOG.debug(f"Failed to log executor stats for {name}: {e}")
class APSchedulerThreadPoolExecutor(pool_executor.BasePoolExecutor):
"""Thread pool executor for APScheduler based classes
@@ -47,5 +84,9 @@ class APSchedulerThreadPoolExecutor(pool_executor.BasePoolExecutor):
"""
def __init__(self, max_workers=10):
pool = get_futurist_pool_executor(max_workers)
super().__init__(pool)
self._executor = get_futurist_pool_executor(max_workers)
super().__init__(self._executor)
@property
def executor(self):
return self._executor

View File

@@ -47,6 +47,11 @@ class BackgroundSchedulerService(
eventlet_helper.patch()
super()._main_loop()
def add_job(self, *args, **kwargs):
executor.log_executor_stats(executors['default'].executor,
name="background-scheduler-pool")
return super().add_job(*args, **kwargs)
def start(self):
"""Start service."""
background.BackgroundScheduler.start(self)

View File

@@ -35,7 +35,14 @@ SERVICE_OPTS = [
),
cfg.IntOpt('service_down_time',
default=90,
help=_('Maximum time since last check-in for up service.'))
help=_('Maximum time since last check-in for up service.')),
cfg.BoolOpt('print_thread_pool_stats',
default=False,
help=_('Enable logging of thread pool executor statistics '
'when submitting tasks. Logs current queue length, '
'number of workers (total, idle or running) and '
'executor runtime statistics. Useful for diagnosing '
'contention and performance issues.'))
]

View File

@@ -51,6 +51,7 @@ class AuditEndpoint:
def trigger_audit(self, context, audit_uuid):
LOG.debug("Trigger audit %s", audit_uuid)
executor.log_executor_stats(self.executor, name="audit-endpoint-pool")
self.executor.submit(self.do_trigger_audit,
context,
audit_uuid)

View File

@@ -45,7 +45,9 @@ class DecisionEngineThreadPool(metaclass=service.Singleton):
:return: future to monitor progress of execution
:rtype: :py:class"`futurist.GreenFuture`
"""
# Statistics from the threadpool
executor.log_executor_stats(self._threadpool,
name="decision-engine-pool")
return self._threadpool.submit(fn, *args, **kwargs)
@staticmethod

View File

@@ -36,3 +36,42 @@ class TestFuturistPoolExecutor(base.TestCase):
pool_executor = executor.get_futurist_pool_executor(max_workers=1)
self.assertIsInstance(pool_executor, futurist.ThreadPoolExecutor)
@mock.patch.object(executor.CONF, 'print_thread_pool_stats', True)
class TestLogExecutorStats(base.TestCase):
@mock.patch.object(executor.LOG, 'debug')
def test_log_executor_stats_eventlet(self, m_log_debug):
workers = 2
pool_executor = futurist.GreenThreadPoolExecutor(workers)
executor.log_executor_stats(pool_executor,
name="test-threadpool-eventlet")
m_log_debug.assert_called_once_with(
f"State of test-threadpool-eventlet GreenThreadPoolExecutor when "
f"submitting a new task: "
f"workers: {len(pool_executor._pool.coroutines_running):d}, "
f"max_workers: {workers:d}, "
f"work queued length: "
f"{pool_executor._delayed_work.unfinished_tasks:d}, "
f"stats: {pool_executor.statistics}")
@mock.patch.object(executor.LOG, 'debug')
def test_log_executor_stats_threading(self, m_log_debug):
workers = 3
pool_executor = futurist.ThreadPoolExecutor(workers)
executor.log_executor_stats(pool_executor,
name="test-threadpool-threading")
m_log_debug.assert_called_once_with(
f"State of test-threadpool-threading ThreadPoolExecutor when "
f"submitting a new task: "
f"max_workers: {workers:d}, "
f"workers: {len(pool_executor._workers):d}, "
f"idle workers: "
f"{len([w for w in pool_executor._workers if w.idle]):d}, "
f"queued work: {pool_executor._work_queue.qsize():d}, "
f"stats: {pool_executor.statistics}")