Add a reserved workers pool (5% by default)

I've seen a situation where heartbeats managed to completely saturate
the conductor workers, so that no API requests could come through that
required interaction with the conductor (i.e. everything other than
reads). Add periodic tasks for a large (thousands) number of nodes, and
you get a completely locked up Ironic.

This change reserves 5% (configurable) of the threads for API requests.
This is done by splitting one executor into two, of which the latter is
only used by normal _spawn_worker calls and only when the former is
exhausted. This allows an operator to apply a remediation, e.g. abort
some deployments or outright power off some nodes.

Partial-Bug: #2038438
Change-Id: Iacc62d33ffccfc11694167ee2a7bc6aad82c1f2f
This commit is contained in:
Dmitry Tantsur 2023-12-06 18:17:39 +01:00
parent 2373127c7d
commit adec0f6f01
No known key found for this signature in database
GPG Key ID: 315B2AF9FD216C60
7 changed files with 112 additions and 20 deletions

View File

@ -149,6 +149,13 @@ using more than one CPU core.
If you use JSON RPC, you also need to make sure the ports don't conflict by
setting the :oslo.config:option:`json_rpc.port` option.
Starting with the 2024.1 "Caracal" release cycle, a small proportion of the
threads (specified by the
:oslo.config:option:`conductor.reserved_workers_pool_percentage` option) is
reserved for API requests and other critical tasks. Periodic tasks and agent
heartbeats cannot use them. This ensures that the API stays responsive even
under extreme internal load.
.. _eventlet: https://eventlet.net/
Database

View File

@ -99,6 +99,29 @@ class BaseConductorManager(object):
# clear all locks held by this conductor before registering
self.dbapi.clear_node_reservations_for_conductor(self.host)
def _init_executors(self, total_workers, reserved_percentage):
# NOTE(dtantsur): do not allow queuing work. Given our model, it's
# better to reject an incoming request with HTTP 503 or reschedule
# a periodic task that end up with hidden backlog that is hard
# to track and debug. Using 1 instead of 0 because of how things are
# ordered in futurist (it checks for rejection first).
rejection_func = rejection.reject_when_reached(1)
reserved_workers = int(total_workers * reserved_percentage / 100)
remaining = total_workers - reserved_workers
LOG.info("Starting workers pool: %d normal workers + %d reserved",
remaining, reserved_workers)
self._executor = futurist.GreenThreadPoolExecutor(
max_workers=remaining,
check_and_reject=rejection_func)
if reserved_workers:
self._reserved_executor = futurist.GreenThreadPoolExecutor(
max_workers=reserved_workers,
check_and_reject=rejection_func)
else:
self._reserved_executor = None
def init_host(self, admin_context=None, start_consoles=True,
start_allocations=True):
"""Initialize the conductor host.
@ -125,16 +148,8 @@ class BaseConductorManager(object):
self._keepalive_evt = threading.Event()
"""Event for the keepalive thread."""
# NOTE(dtantsur): do not allow queuing work. Given our model, it's
# better to reject an incoming request with HTTP 503 or reschedule
# a periodic task that end up with hidden backlog that is hard
# to track and debug. Using 1 instead of 0 because of how things are
# ordered in futurist (it checks for rejection first).
rejection_func = rejection.reject_when_reached(1)
self._executor = futurist.GreenThreadPoolExecutor(
max_workers=CONF.conductor.workers_pool_size,
check_and_reject=rejection_func)
"""Executor for performing tasks async."""
self._init_executors(CONF.conductor.workers_pool_size,
CONF.conductor.reserved_workers_pool_percentage)
# TODO(jroll) delete the use_groups argument and use the default
# in Stein.
@ -358,6 +373,8 @@ class BaseConductorManager(object):
# having work complete normally.
self._periodic_tasks.stop()
self._periodic_tasks.wait()
if self._reserved_executor is not None:
self._reserved_executor.shutdown(wait=True)
self._executor.shutdown(wait=True)
if self._zeroconf is not None:
@ -453,7 +470,8 @@ class BaseConductorManager(object):
if self._mapped_to_this_conductor(*result[:3]):
yield result
def _spawn_worker(self, func, *args, **kwargs):
def _spawn_worker(self, func, *args, _allow_reserved_pool=True,
**kwargs):
"""Create a greenthread to run func(*args, **kwargs).
@ -466,6 +484,14 @@ class BaseConductorManager(object):
"""
try:
return self._executor.submit(func, *args, **kwargs)
except futurist.RejectedSubmission:
if not _allow_reserved_pool or self._reserved_executor is None:
raise exception.NoFreeConductorWorker()
LOG.debug('Normal workers pool is full, using reserved pool to run %s',
func.__qualname__)
try:
return self._reserved_executor.submit(func, *args, **kwargs)
except futurist.RejectedSubmission:
raise exception.NoFreeConductorWorker()

View File

@ -3209,7 +3209,10 @@ class ConductorManager(base_manager.BaseConductorManager):
task.spawn_after(
self._spawn_worker, task.driver.deploy.heartbeat,
task, callback_url, agent_version, agent_verify_ca,
agent_status, agent_status_message)
agent_status, agent_status_message,
# NOTE(dtantsur): heartbeats are not that critical to allow
# them to potentially overload the conductor.
_allow_reserved_pool=False)
@METRICS.timer('ConductorManager.vif_list')
@messaging.expected_exceptions(exception.NetworkError,

View File

@ -28,6 +28,12 @@ opts = [
'itself for handling heart beats and periodic tasks. '
'On top of that, `sync_power_state_workers` will take '
'up to 7 green threads with the default value of 8.')),
cfg.IntOpt('reserved_workers_pool_percentage',
default=5, min=0, max=50,
help=_('The percentage of the whole workers pool that will be '
'kept for API requests and other important tasks. '
'This part of the pool will not be used for periodic '
'tasks or agent heartbeats. Set to 0 to disable.')),
cfg.IntOpt('heartbeat_interval',
default=10,
help=_('Seconds between conductor heart beats.')),

View File

@ -402,20 +402,55 @@ class ManagerSpawnWorkerTestCase(tests_base.TestCase):
def setUp(self):
super(ManagerSpawnWorkerTestCase, self).setUp()
self.service = manager.ConductorManager('hostname', 'test-topic')
self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor)
self.service._executor = self.executor
self.service._executor = mock.Mock(
spec=futurist.GreenThreadPoolExecutor)
self.service._reserved_executor = mock.Mock(
spec=futurist.GreenThreadPoolExecutor)
self.func = lambda: None
def test__spawn_worker(self):
self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
self.service._spawn_worker(self.func, 1, 2, foo='bar', cat='meow')
self.executor.submit.assert_called_once_with(
'fake', 1, 2, foo='bar', cat='meow')
self.service._executor.submit.assert_called_once_with(
self.func, 1, 2, foo='bar', cat='meow')
self.service._reserved_executor.submit.assert_not_called()
def test__spawn_worker_none_free(self):
self.executor.submit.side_effect = futurist.RejectedSubmission()
def test__spawn_worker_reserved(self):
self.service._executor.submit.side_effect = \
futurist.RejectedSubmission()
self.service._spawn_worker(self.func, 1, 2, foo='bar', cat='meow')
self.service._executor.submit.assert_called_once_with(
self.func, 1, 2, foo='bar', cat='meow')
self.service._reserved_executor.submit.assert_called_once_with(
self.func, 1, 2, foo='bar', cat='meow')
def test__spawn_worker_cannot_use_reserved(self):
self.service._executor.submit.side_effect = \
futurist.RejectedSubmission()
self.assertRaises(exception.NoFreeConductorWorker,
self.service._spawn_worker, 'fake')
self.service._spawn_worker, self.func,
_allow_reserved_pool=False)
def test__spawn_worker_no_reserved(self):
self.service._executor.submit.side_effect = \
futurist.RejectedSubmission()
self.service._reserved_executor = None
self.assertRaises(exception.NoFreeConductorWorker,
self.service._spawn_worker, self.func)
def test__spawn_worker_none_free(self):
self.service._executor.submit.side_effect = \
futurist.RejectedSubmission()
self.service._reserved_executor.submit.side_effect = \
futurist.RejectedSubmission()
self.assertRaises(exception.NoFreeConductorWorker,
self.service._spawn_worker, self.func)
@mock.patch.object(objects.Conductor, 'unregister_all_hardware_interfaces',

View File

@ -7609,6 +7609,15 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.assertEqual(states.NOSTATE, node.target_provision_state)
self.assertIsNone(node.last_error)
@mgr_utils.mock_record_keepalive
class HeartbeatTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
def _fake_spawn(self, conductor_obj, func, *args, **kwargs):
self.assertFalse(kwargs.pop('_allow_reserved_pool'))
func(*args, **kwargs)
return mock.MagicMock()
# TODO(TheJulia): We should double check if these heartbeat tests need
# to move. I have this strange feeling we were lacking rpc testing of
# heartbeat until we did adoption testing....

View File

@ -0,0 +1,6 @@
---
fixes:
- |
Each conductor now reserves a small proportion of its worker threads (5%
by default) for API requests and other critical tasks. This ensures that
the API stays responsive even under extreme internal load.