Remove eventlet-based timeout in CDM collectors

This patch removes the dependency on eventlet.Timeout for cluster
data model (CDM) collectors and implements a different timeout
handling using futurist waiters, for compute collector.

Changes:
* Remove eventlet-based timeout wrapper in DecisionEngineSchedulingService
  - Removed _as_timed_sync_func() and _wrap_collector_sync_with_timeout()
  - Simplified add_sync_jobs() to call collector.synchronize() directly

* Implement timeout handling for concurrent operations in Nova collector:
  - Add compute_resources_collector_timeout configuration option (default: 600s)
  - Cancel futures and abort collection gracefully on timeout

Assisted-By: Cursor (claude-4.5-sonnet)

Change-Id: I7969e1ee295c2c625fbfc1f2acdee13ddd82e7dd
Signed-off-by: Douglas Viroel <viroel@gmail.com>
This commit is contained in:
Douglas Viroel
2025-11-26 17:47:50 -03:00
parent f6aac33e70
commit 229bce2a91
12 changed files with 305 additions and 104 deletions

View File

@@ -0,0 +1,11 @@
---
upgrade:
- |
The previous collector operation timeout mechanism, which was based on the
`period` configuration option, has been replaced by a new configuration
option `compute_resources_collector_timeout`. This new configuration
option provides more explicit control over the timeout for resource
collection operations in the Compute Cluster Data Model. Operators
should review and adjust this new configuration according to their
environment's needs. Note that collectors `period` configuration didn't
change and continue to work as before.

View File

@@ -46,7 +46,15 @@ Custom data model collector plugins can be defined with the
default=1,
help="Time before retry after failed query to "
"external service.",
deprecated_name="api_query_timeout")
deprecated_name="api_query_timeout"),
cfg.IntOpt("compute_resources_collector_timeout",
min=30,
default=600,
help="Timeout in seconds for collecting multiple compute "
"resources from nova. Note that this timeout does not "
"represent the total time for collecting all resources. "
"Setting this value to 0 or small values will cause the "
"collector to abort and stop the collection process."),
]

View File

@@ -190,7 +190,11 @@ class BaseClusterDataModelCollector(loadable.LoadableSingleton,
Whenever called this synchronization will perform a drop-in replacement
with the existing cluster data model
"""
self.cluster_data_model = self.execute()
try:
self.cluster_data_model = self.execute()
except Exception as e:
LOG.exception(e)
self.set_cluster_data_model_as_stale()
class BaseModelBuilder:

View File

@@ -154,7 +154,12 @@ class CinderClusterDataModelCollector(base.BaseClusterDataModelCollector):
return
builder = CinderModelBuilder(self.osc)
return builder.execute(self._data_model_scope)
try:
return builder.execute(self._data_model_scope)
except Exception as e:
LOG.exception(e)
raise exception.ClusterDataModelCollectionError(
cdm="storage") from e
class CinderModelBuilder(base.BaseModelBuilder):

View File

@@ -16,6 +16,7 @@
from oslo_log import log
from watcher.common import exception
from watcher.common import ironic_helper
from watcher.decision_engine.model.collector import base
from watcher.decision_engine.model import element
@@ -67,7 +68,12 @@ class BaremetalClusterDataModelCollector(base.BaseClusterDataModelCollector):
return
builder = BareMetalModelBuilder(self.osc)
return builder.execute(self._data_model_scope)
try:
return builder.execute(self._data_model_scope)
except Exception as e:
LOG.exception(e)
raise exception.ClusterDataModelCollectionError(
cdm="baremetal") from e
class BareMetalModelBuilder(base.BaseModelBuilder):

View File

@@ -13,10 +13,12 @@
# limitations under the License.
import os_resource_classes as orc
from oslo_config import cfg
from oslo_log import log
from futurist import waiters
from watcher.common import exception
from watcher.common import nova_helper
from watcher.common import placement_helper
from watcher.decision_engine.model.collector import base
@@ -27,6 +29,7 @@ from watcher.decision_engine.scope import compute as compute_scope
from watcher.decision_engine import threading
LOG = log.getLogger(__name__)
CONF = cfg.CONF
class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector):
@@ -189,7 +192,12 @@ class NovaClusterDataModelCollector(base.BaseClusterDataModelCollector):
return
builder = NovaModelBuilder(self.osc)
return builder.execute(self._data_model_scope)
try:
return builder.execute(self._data_model_scope)
except Exception as e:
LOG.exception(e)
raise exception.ClusterDataModelCollectionError(
cdm="compute") from e
class NovaModelBuilder(base.BaseModelBuilder):
@@ -219,6 +227,8 @@ class NovaModelBuilder(base.BaseModelBuilder):
self.nova_helper = nova_helper.NovaHelper(osc=self.osc)
self.placement_helper = placement_helper.PlacementHelper(osc=self.osc)
self.executor = threading.DecisionEngineThreadPool()
self.collector_timeout = (
CONF.collector.compute_resources_collector_timeout)
def _collect_aggregates(self, host_aggregates, _nodes):
if not host_aggregates:
@@ -262,7 +272,6 @@ class NovaModelBuilder(base.BaseModelBuilder):
"""
try:
node_info = future.result()[0]
# filter out baremetal node
if node_info.hypervisor_type == 'ironic':
LOG.debug("filtering out baremetal node: %s", node_info)
@@ -314,7 +323,18 @@ class NovaModelBuilder(base.BaseModelBuilder):
self.executor.submit(
self._collect_zones, availability_zones, compute_nodes)
}
waiters.wait_for_all(zone_aggregate_futures)
_done, zone_aggregate_not_done = waiters.wait_for_all(
zone_aggregate_futures,
timeout=self.collector_timeout)
if len(zone_aggregate_not_done) > 0:
LOG.warning("Timed out waiting to collect compute nodes "
"from availability zones and host aggregates. "
"Aborting collection of compute nodes information")
for future in zone_aggregate_not_done:
future.cancel()
# Return and don't continue with the collection
return
# if zones and aggregates did not contain any nodes get every node.
if not compute_nodes:
@@ -334,10 +354,22 @@ class NovaModelBuilder(base.BaseModelBuilder):
# Futures will concurrently be added, only safe with CPython GIL
future_instances = []
self.executor.do_while_futures_modify(
node_futures, self._compute_node_future, future_instances)
node_futures,
self._compute_node_future, future_instances,
futures_timeout=self.collector_timeout)
# Wait for all instance jobs to finish
waiters.wait_for_all(future_instances)
_done, instances_not_done = waiters.wait_for_all(
future_instances,
timeout=self.collector_timeout)
if len(instances_not_done) > 0:
LOG.warning("Timed out waiting to collect instances "
"information for compute nodes. "
"Aborting collection of instances information.")
for future in instances_not_done:
future.cancel()
# Return and don't continue with the collection
return
def add_compute_node(self, node):
# Build and add base node.

View File

@@ -15,11 +15,9 @@
import datetime
import eventlet
from oslo_log import log
from watcher.common import context
from watcher.common import exception
from watcher.common import scheduling
from watcher.decision_engine.model.collector import manager
@@ -44,39 +42,12 @@ class DecisionEngineSchedulingService(scheduling.BackgroundSchedulerService):
return self.collector_manager.get_collectors()
def add_sync_jobs(self):
for name, collector in self.collectors.items():
timed_task = self._wrap_collector_sync_with_timeout(
collector, name)
self.add_job(timed_task,
for collector in self.collectors.values():
self.add_job(collector.synchronize,
trigger='interval',
seconds=collector.config.period,
next_run_time=datetime.datetime.now())
def _as_timed_sync_func(self, sync_func, name, timeout):
def _timed_sync():
with eventlet.Timeout(
timeout,
exception=exception.ClusterDataModelCollectionError(cdm=name)
):
sync_func()
return _timed_sync
def _wrap_collector_sync_with_timeout(self, collector, name):
"""Add an execution timeout constraint on a function"""
timeout = collector.config.period
def _sync():
try:
timed_sync = self._as_timed_sync_func(
collector.synchronize, name, timeout)
timed_sync()
except Exception as exc:
LOG.exception(exc)
collector.set_cluster_data_model_as_stale()
return _sync
def add_checkstate_job(self):
# 30 minutes interval
interval = CONF.watcher_decision_engine.check_periodic_interval

View File

@@ -51,7 +51,7 @@ class DecisionEngineThreadPool(metaclass=service.Singleton):
return self._threadpool.submit(fn, *args, **kwargs)
@staticmethod
def do_while_futures(futures, fn, *args, **kwargs):
def do_while_futures(futures, fn, *args, futures_timeout=None, **kwargs):
"""Do while to execute a function upon completion from a collection
Will execute the specified function with its arguments when one of the
@@ -63,6 +63,7 @@ class DecisionEngineThreadPool(metaclass=service.Singleton):
:type futures: list :py:class:`futurist.GreenFuture`
:param fn: function to execute upon the future finishing execution
:param args: arguments for the function
:param futures_timeout: timeout in seconds for futurist wait operations
:param kwargs: amount of arguments for the function
"""
@@ -72,10 +73,11 @@ class DecisionEngineThreadPool(metaclass=service.Singleton):
futures = copy.copy(futures)
DecisionEngineThreadPool.do_while_futures_modify(
futures, fn, *args, **kwargs)
futures, fn, *args, futures_timeout=futures_timeout, **kwargs)
@staticmethod
def do_while_futures_modify(futures, fn, *args, **kwargs):
def do_while_futures_modify(futures, fn, *args,
futures_timeout=None, **kwargs):
"""Do while to execute a function upon completion from a collection
Will execute the specified function with its arguments when one of the
@@ -86,13 +88,25 @@ class DecisionEngineThreadPool(metaclass=service.Singleton):
:param futures: list, set or dictionary of futures
:type futures: list :py:class:`futurist.GreenFuture`
:param fn: function to execute upon the future finishing execution
:param futures_timeout: timeout in seconds for futurist wait operations
:param args: arguments for the function
:param kwargs: amount of arguments for the function
"""
waits = waiters.wait_for_any(futures)
waits = waiters.wait_for_any(futures, timeout=futures_timeout)
while len(waits[0]) > 0 or len(waits[1]) > 0:
for future in waiters.wait_for_any(futures)[0]:
# NOTE(dviroel): if finished futures are empty, the wait_for_any
# has returned due to a timeout, if provided by the caller.
# In this scenario, we cancel the remaining pending futures and
# break the loop, otherwise it may stay in the loop indefinitely.
if not len(waits[0]):
LOG.warning("No futures finished during the timeout period, "
"aborting remaining pending futures")
for future in waits[1]:
future.cancel()
break
for future in waits[0]:
fn(future, *args, **kwargs)
futures.remove(future)
waits = waiters.wait_for_any(futures)
waits = waiters.wait_for_any(futures, timeout=futures_timeout)

View File

@@ -151,5 +151,5 @@ class TestCinderClusterDataModelCollector(base.TestCase):
config=m_config, osc=m_osc)
cinder_cdmc.get_audit_scope_handler([])
self.assertRaises(exception.InvalidPoolAttributeValue,
self.assertRaises(exception.ClusterDataModelCollectionError,
cinder_cdmc.execute)

View File

@@ -16,9 +16,13 @@
# limitations under the License.
import ddt
import futurist
import os_resource_classes as orc
import time
from unittest import mock
from novaclient.v2 import hypervisors
from watcher.common import nova_helper
from watcher.common import placement_helper
from watcher.decision_engine.model.collector import nova
@@ -512,6 +516,147 @@ class TestNovaModelBuilder(base.TestCase):
self.assertEqual(
m_nova.return_value.get_instance_list.call_count, 2)
@mock.patch.object(futurist.Future, 'cancel')
@mock.patch.object(placement_helper, 'PlacementHelper')
@mock.patch.object(nova_helper.NovaHelper, 'get_aggregate_list')
@mock.patch.object(nova_helper.NovaHelper, 'get_service_list')
@mock.patch.object(nova_helper.NovaHelper, 'get_compute_node_by_name')
@mock.patch.object(nova.NovaModelBuilder, 'add_instance_node')
def test_add_physical_layer_instances_timeout(
self, m_add_instance_node, m_nh_compute_name,
m_nh_service, m_nh_aggr, m_placement, m_fut_cancel):
"""Test add_physical_layer with timeout on collecting aggregates"""
mock_placement = mock.Mock(name="placement_helper")
mock_placement.get_inventories.return_value = dict()
mock_placement.get_usages_for_resource_provider.return_value = None
m_placement.return_value = mock_placement
m_nh_aggr.return_value = (
[mock.Mock(id=1, name='example'),
mock.Mock(id=5, name='example', hosts=['hostone', 'hosttwo'])])
m_nh_service.return_value = (
[mock.Mock(zone='av_b', host='hostthree'),
mock.Mock(zone='av_a', host='hostone')])
compute_node_one_info = {
'id': '796fee99-65dd-4262-aa-fd2a1143faa6',
'hypervisor_hostname': 'hostone',
'hypervisor_type': 'QEMU',
'state': 'TEST_STATE',
'status': 'TEST_STATUS',
'memory_mb': 333,
'memory_mb_used': 100,
'free_disk_gb': 222,
'local_gb': 111,
'local_gb_used': 10,
'vcpus': 4,
'vcpus_used': 0,
'servers': [
{'name': 'fake_instance',
'uuid': 'ef500f7e-dac8-470f-960c-169486fce71b'}
],
'service': {'id': 123, 'host': 'hostone',
'disabled_reason': ''},
}
compute_node_one = hypervisors.Hypervisor(
hypervisors.HypervisorManager, info=compute_node_one_info)
compute_node_two_info = {
'id': '796fee99-65dd-4262-aa-fd2a1143faa7',
'hypervisor_hostname': 'hosttwo',
'hypervisor_type': 'QEMU',
'state': 'TEST_STATE',
'status': 'TEST_STATUS',
'memory_mb': 333,
'memory_mb_used': 100,
'free_disk_gb': 222,
'local_gb': 111,
'local_gb_used': 10,
'vcpus': 4,
'vcpus_used': 0,
'servers': [
{'name': 'fake_instance2',
'uuid': 'ef500f7e-dac8-47f0-960c-169486fce71b'}
],
'service': {'id': 123, 'host': 'hosttwo',
'disabled_reason': ''},
}
compute_node_two = hypervisors.Hypervisor(
hypervisors.HypervisorManager, info=compute_node_two_info)
# Set max general workers to 1 to ensure only one worker is used
# and we timed out on collecting instances
self.flags(max_general_workers=1, group='watcher_decision_engine')
m_nh_compute_name.side_effect = [
[compute_node_one], [compute_node_two]
]
m_scope = [{"compute": [
{"host_aggregates": [{"id": 5}]},
{"availability_zones": [{"name": "av_a"}]}
]}]
def fake_collector(self, *args):
return time.sleep(0.5)
m_add_instance_node.side_effect = mock.Mock(
side_effect=fake_collector)
t_nova_cluster = nova.NovaModelBuilder(mock.Mock())
# NOTE(dviroel): ModelBuilder reads the timeout directly
# from the configuration and don't allow value lower than 30s
# Here we need to set the value to 0.1 to simulate
# a timeout without adding huge sleeps in the fake collector.
t_nova_cluster.collector_timeout = 0.1
model = t_nova_cluster.execute(m_scope)
# Assert that futures were cancelled
m_fut_cancel.assert_has_calls([mock.call() for _ in range(2)])
self.assertEqual(len(model.get_all_compute_nodes()), 2)
# Instances should not be added to the model due to timeout
self.assertEqual(len(model.get_all_instances()), 0)
@mock.patch.object(nova_helper.NovaHelper, 'get_compute_node_list')
@mock.patch.object(nova_helper.NovaHelper, 'get_compute_node_by_name')
@mock.patch.object(nova.NovaModelBuilder, '_collect_aggregates')
@mock.patch.object(nova.NovaModelBuilder, '_collect_zones')
def test_add_physical_layer_aggregates_timeout(
self, m_collect_zones, m_collect_aggregates,
m_nh_node_name, m_nh_node_list):
"""Test add_physical_layer with timeout on collecting aggregates"""
self.flags(max_general_workers=1, group='watcher_decision_engine')
def fake_collector(self, *args):
pass
m_collect_aggregates.side_effect = mock.Mock(
side_effect=fake_collector)
m_collect_zones.side_effect = mock.Mock(
side_effect=fake_collector)
t_nova_cluster = nova.NovaModelBuilder(mock.Mock())
# NOTE(dviroel): ModelBuilder reads the timeout directly
# from the configuration and don't allow value lower than 30s
# Setting timeout to 0 so waiter will return immediately
t_nova_cluster.collector_timeout = 0
m_scope = [{"compute": []}]
model = t_nova_cluster.execute(m_scope)
# Model shouldn't have any updated compute nodes
self.assertEqual(len(model.get_all_compute_nodes()), 0)
# Get compute node list and by name shouldn't be called
m_nh_node_list.assert_not_called()
m_nh_node_name.assert_not_called()
@mock.patch.object(placement_helper, 'PlacementHelper')
@mock.patch.object(nova_helper, 'NovaHelper')
def test_add_physical_layer_with_baremetal_node(self, m_nova,
@@ -529,34 +674,38 @@ class TestNovaModelBuilder(base.TestCase):
[mock.Mock(zone='av_b', host='hostthree'),
mock.Mock(zone='av_a', host='hostone')]
compute_node = mock.Mock(
id='796fee99-65dd-4262-aa-fd2a1143faa6',
hypervisor_hostname='hostone',
hypervisor_type='QEMU',
state='TEST_STATE',
status='TEST_STATUS',
memory_mb=333,
memory_mb_used=100,
free_disk_gb=222,
local_gb=111,
local_gb_used=10,
vcpus=4,
vcpus_used=0,
servers=[
compute_node_info = {
'id': '796fee99-65dd-4262-aa-fd2a1143faa6',
'hypervisor_hostname': 'hostone',
'hypervisor_type': 'QEMU',
'state': 'TEST_STATE',
'status': 'TEST_STATUS',
'memory_mb': 333,
'memory_mb_used': 100,
'free_disk_gb': 222,
'local_gb': 111,
'local_gb_used': 10,
'vcpus': 4,
'vcpus_used': 0,
'servers': [
{'name': 'fake_instance',
'uuid': 'ef500f7e-dac8-470f-960c-169486fce71b'}
],
service={'id': 123, 'host': 'hostone',
'disabled_reason': ''},
)
'service': {'id': 123, 'host': 'hostone', 'disabled_reason': ''},
}
compute_node = hypervisors.Hypervisor(
hypervisors.HypervisorManager, info=compute_node_info)
baremetal_node = mock.Mock(
id='5f2d1b3d-4099-4623-b9-05148aefd6cb',
hypervisor_hostname='hosttwo',
hypervisor_type='ironic',
state='TEST_STATE',
status='TEST_STATUS',
)
baremetal_node_info = {
'id': '5f2d1b3d-4099-4623-b9-05148aefd6cb',
'status': 'TEST_STATUS',
'hypervisor_hostname': 'hosttwo',
'service': {
'host': 'hosttwo'
}
}
baremetal_node = hypervisors.Hypervisor(
hypervisors.HypervisorManager, info=baremetal_node_info)
m_nova.return_value.get_compute_node_by_name.side_effect = [
[compute_node], [baremetal_node]]
@@ -577,3 +726,13 @@ class TestNovaModelBuilder(base.TestCase):
'hosttwo', servers=True, detailed=True)
self.assertEqual(
m_nova.return_value.get_compute_node_by_name.call_count, 2)
def test_nova_model_builder_timeout_configuration(self):
"""Test that model builder timeout is configured"""
self.flags(compute_resources_collector_timeout=123,
group='collector')
t_nova_cluster = nova.NovaModelBuilder(mock.Mock())
self.assertEqual(t_nova_cluster.collector_timeout, 123)

View File

@@ -17,7 +17,6 @@
from apscheduler.schedulers import background
from apscheduler.triggers import interval as interval_trigger
import eventlet
from unittest import mock
from oslo_config import cfg
@@ -109,33 +108,3 @@ class TestDecisionEngineSchedulingService(base.TestCase):
self.assertTrue(bool(fake_collector.cluster_data_model))
self.assertIsInstance(job.trigger, interval_trigger.IntervalTrigger)
@mock.patch.object(
default_loading.ClusterDataModelCollectorLoader, 'load')
@mock.patch.object(
default_loading.ClusterDataModelCollectorLoader, 'list_available')
@mock.patch.object(background.BackgroundScheduler, 'start')
def test_execute_sync_job_fails(self, m_start, m_list_available,
m_load, m_list, m_save):
fake_config = mock.Mock(period=.01)
fake_collector = faker_cluster_state.FakerModelCollector(
config=fake_config)
fake_collector.synchronize = mock.Mock(
side_effect=lambda: eventlet.sleep(.5))
m_list_available.return_value = {
'fake': faker_cluster_state.FakerModelCollector}
m_load.return_value = fake_collector
scheduler = scheduling.DecisionEngineSchedulingService()
scheduler.start()
m_start.assert_called_once_with(scheduler)
jobs = scheduler.get_jobs()
self.assertEqual(2, len(jobs))
job = jobs[0]
job.func()
self.assertFalse(bool(fake_collector.cluster_data_model))
self.assertIsInstance(job.trigger, interval_trigger.IntervalTrigger)

View File

@@ -18,6 +18,7 @@
import futurist
from unittest import mock
from watcher.common import executor
from watcher.decision_engine import threading
from watcher.tests.unit import base
@@ -47,8 +48,11 @@ class TestDecisionEngineThreadPool(base.TestCase):
self.m_threadpool.submit = self.m_threadpool.submit.__get__(
self.m_threadpool, threading.DecisionEngineThreadPool)
# perform all tests synchronously
self.m_threadpool._threadpool = futurist.SynchronousExecutor()
self.m_threadpool._threadpool = executor.get_futurist_pool_executor(1)
@staticmethod
def noop_function(*args, **kwargs):
pass
def test_singleton(self):
"""Ensure only one object of DecisionEngineThreadPool can be created"""
@@ -74,6 +78,7 @@ class TestDecisionEngineThreadPool(base.TestCase):
# create a collection of futures from submitted m_function tasks
futures = [self.m_threadpool.submit(self.m_function, 1, 2)]
futurist.waiters.wait_for_all(futures)
self.m_function.assert_called_once_with(1, 2)
@@ -99,6 +104,7 @@ class TestDecisionEngineThreadPool(base.TestCase):
# create a collection of futures from submitted m_function tasks
futures = [self.m_threadpool.submit(self.m_function, 1, 2)]
futurist.waiters.wait_for_all(futures)
self.m_function.assert_called_once_with(1, 2)
@@ -146,3 +152,19 @@ class TestDecisionEngineThreadPool(base.TestCase):
# test that the passed do_while function has been called 10 times
self.m_do_while_function.assert_has_calls(
calls_do_while, any_order=True)
def test_do_while_futures_modify_timeout(self):
"""Test the operation of the do_while_futures with a timeout"""
# create a collection of futures from submitted m_function tasks
futures = [self.m_threadpool.submit(
TestDecisionEngineThreadPool.noop_function) for i in range(3)]
self.m_threadpool.do_while_futures_modify(
futures, self.m_do_while_function, futures_timeout=0)
# At least one future should be running or cancelled
self.assertGreater(len(futures), 0)
for future in futures:
# We only expect futures that were cancelled or are still running
self.assertTrue(future.cancelled() or future.running())