From f8ede24051e907c809b724d6b8fdba5c93aa528d Mon Sep 17 00:00:00 2001 From: Douglas Viroel Date: Fri, 12 Sep 2025 16:17:15 -0300 Subject: [PATCH] Add ThreadPool statistics debug log Similar with the approach implemented in nova[1], this patch adds a debug log with statistics from all ThreadPools created by watcher. It logs its current size, max size,.. It can be enabled with a config option which is disabled by default. Assisted-By: Cursor (gpt-5) [1] https://github.com/openstack/nova/commit/ae3ae0700d507644db951a68d4a44a60d8d11e06 Change-Id: I1a67ab33c3040cfab0538f9d7cf9eb14ee2dd302 Signed-off-by: Douglas Viroel --- .zuul.yaml | 4 ++ watcher/common/executor.py | 47 +++++++++++++++++-- watcher/common/scheduling.py | 5 ++ watcher/conf/service.py | 9 +++- .../messaging/audit_endpoint.py | 1 + watcher/decision_engine/threading.py | 4 +- watcher/tests/common/test_executor.py | 39 +++++++++++++++ 7 files changed, 104 insertions(+), 5 deletions(-) diff --git a/.zuul.yaml b/.zuul.yaml index 0df370723..3954e5798 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -62,6 +62,8 @@ devstack_local_conf: post-config: $WATCHER_CONF: + DEFAULT: + print_thread_pool_stats: true watcher_cluster_data_model_collectors.compute: period: 120 watcher_cluster_data_model_collectors.baremetal: @@ -199,6 +201,8 @@ devstack_local_conf: post-config: $WATCHER_CONF: + DEFAULT: + print_thread_pool_stats: true watcher_datasources: datasources: prometheus prometheus_client: diff --git a/watcher/common/executor.py b/watcher/common/executor.py index d9c651215..fa491e3ac 100644 --- a/watcher/common/executor.py +++ b/watcher/common/executor.py @@ -13,11 +13,16 @@ # under the License. from apscheduler.executors import pool as pool_executor - import futurist +from oslo_config import cfg +from oslo_log import log from watcher import eventlet as eventlet_helper +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + def get_futurist_pool_executor(max_workers=10): """Returns a futurist pool executor @@ -33,6 +38,38 @@ def get_futurist_pool_executor(max_workers=10): return futurist.ThreadPoolExecutor(max_workers) +def log_executor_stats(executor, name="unknown"): + """Log the statistics of the executor. + + This is usually called before submitting a new task. + """ + if not CONF.print_thread_pool_stats: + return + + stats: futurist.ExecutorStatistics = executor.statistics + try: + if isinstance(executor, futurist.ThreadPoolExecutor): + LOG.debug( + f"State of {name} ThreadPoolExecutor when submitting a new " + f"task: max_workers: {executor._max_workers:d}, " + f"workers: {len(executor._workers):d}, " + "idle workers: " + f"{len([w for w in executor._workers if w.idle]):d}, " + f"queued work: {executor._work_queue.qsize():d}, " + f"stats: {stats}") + elif isinstance(executor, futurist.GreenThreadPoolExecutor): + LOG.debug( + f"State of {name} GreenThreadPoolExecutor when submitting a " + "new task: " + f"workers: {len(executor._pool.coroutines_running):d}, " + f"max_workers: {executor._pool.size:d}, " + f"work queued length: " + f"{executor._delayed_work.unfinished_tasks:d}, " + f"stats: {stats}") + except Exception as e: + LOG.debug(f"Failed to log executor stats for {name}: {e}") + + class APSchedulerThreadPoolExecutor(pool_executor.BasePoolExecutor): """Thread pool executor for APScheduler based classes @@ -47,5 +84,9 @@ class APSchedulerThreadPoolExecutor(pool_executor.BasePoolExecutor): """ def __init__(self, max_workers=10): - pool = get_futurist_pool_executor(max_workers) - super().__init__(pool) + self._executor = get_futurist_pool_executor(max_workers) + super().__init__(self._executor) + + @property + def executor(self): + return self._executor diff --git a/watcher/common/scheduling.py b/watcher/common/scheduling.py index c3b3e2a03..6addb7c27 100644 --- a/watcher/common/scheduling.py +++ b/watcher/common/scheduling.py @@ -47,6 +47,11 @@ class BackgroundSchedulerService( eventlet_helper.patch() super()._main_loop() + def add_job(self, *args, **kwargs): + executor.log_executor_stats(executors['default'].executor, + name="background-scheduler-pool") + return super().add_job(*args, **kwargs) + def start(self): """Start service.""" background.BackgroundScheduler.start(self) diff --git a/watcher/conf/service.py b/watcher/conf/service.py index c0cd66ac9..72acfd65d 100644 --- a/watcher/conf/service.py +++ b/watcher/conf/service.py @@ -35,7 +35,14 @@ SERVICE_OPTS = [ ), cfg.IntOpt('service_down_time', default=90, - help=_('Maximum time since last check-in for up service.')) + help=_('Maximum time since last check-in for up service.')), + cfg.BoolOpt('print_thread_pool_stats', + default=False, + help=_('Enable logging of thread pool executor statistics ' + 'when submitting tasks. Logs current queue length, ' + 'number of workers (total, idle or running) and ' + 'executor runtime statistics. Useful for diagnosing ' + 'contention and performance issues.')) ] diff --git a/watcher/decision_engine/messaging/audit_endpoint.py b/watcher/decision_engine/messaging/audit_endpoint.py index 7cf0f3e7e..d3368b0c5 100644 --- a/watcher/decision_engine/messaging/audit_endpoint.py +++ b/watcher/decision_engine/messaging/audit_endpoint.py @@ -51,6 +51,7 @@ class AuditEndpoint: def trigger_audit(self, context, audit_uuid): LOG.debug("Trigger audit %s", audit_uuid) + executor.log_executor_stats(self.executor, name="audit-endpoint-pool") self.executor.submit(self.do_trigger_audit, context, audit_uuid) diff --git a/watcher/decision_engine/threading.py b/watcher/decision_engine/threading.py index 8b59f6670..cb449cd6f 100644 --- a/watcher/decision_engine/threading.py +++ b/watcher/decision_engine/threading.py @@ -45,7 +45,9 @@ class DecisionEngineThreadPool(metaclass=service.Singleton): :return: future to monitor progress of execution :rtype: :py:class"`futurist.GreenFuture` """ - + # Statistics from the threadpool + executor.log_executor_stats(self._threadpool, + name="decision-engine-pool") return self._threadpool.submit(fn, *args, **kwargs) @staticmethod diff --git a/watcher/tests/common/test_executor.py b/watcher/tests/common/test_executor.py index 27c3ceb06..8a4916e7c 100644 --- a/watcher/tests/common/test_executor.py +++ b/watcher/tests/common/test_executor.py @@ -36,3 +36,42 @@ class TestFuturistPoolExecutor(base.TestCase): pool_executor = executor.get_futurist_pool_executor(max_workers=1) self.assertIsInstance(pool_executor, futurist.ThreadPoolExecutor) + + +@mock.patch.object(executor.CONF, 'print_thread_pool_stats', True) +class TestLogExecutorStats(base.TestCase): + + @mock.patch.object(executor.LOG, 'debug') + def test_log_executor_stats_eventlet(self, m_log_debug): + workers = 2 + pool_executor = futurist.GreenThreadPoolExecutor(workers) + + executor.log_executor_stats(pool_executor, + name="test-threadpool-eventlet") + + m_log_debug.assert_called_once_with( + f"State of test-threadpool-eventlet GreenThreadPoolExecutor when " + f"submitting a new task: " + f"workers: {len(pool_executor._pool.coroutines_running):d}, " + f"max_workers: {workers:d}, " + f"work queued length: " + f"{pool_executor._delayed_work.unfinished_tasks:d}, " + f"stats: {pool_executor.statistics}") + + @mock.patch.object(executor.LOG, 'debug') + def test_log_executor_stats_threading(self, m_log_debug): + workers = 3 + pool_executor = futurist.ThreadPoolExecutor(workers) + + executor.log_executor_stats(pool_executor, + name="test-threadpool-threading") + + m_log_debug.assert_called_once_with( + f"State of test-threadpool-threading ThreadPoolExecutor when " + f"submitting a new task: " + f"max_workers: {workers:d}, " + f"workers: {len(pool_executor._workers):d}, " + f"idle workers: " + f"{len([w for w in pool_executor._workers if w.idle]):d}, " + f"queued work: {pool_executor._work_queue.qsize():d}, " + f"stats: {pool_executor.statistics}")