From adec0f6f01a1733bbdbe8e1f962138e5c7363e90 Mon Sep 17 00:00:00 2001 From: Dmitry Tantsur Date: Wed, 6 Dec 2023 18:17:39 +0100 Subject: [PATCH] Add a reserved workers pool (5% by default) I've seen a situation where heartbeats managed to completely saturate the conductor workers, so that no API requests could come through that required interaction with the conductor (i.e. everything other than reads). Add periodic tasks for a large (thousands) number of nodes, and you get a completely locked up Ironic. This change reserves 5% (configurable) of the threads for API requests. This is done by splitting one executor into two, of which the latter is only used by normal _spawn_worker calls and only when the former is exhausted. This allows an operator to apply a remediation, e.g. abort some deployments or outright power off some nodes. Partial-Bug: #2038438 Change-Id: Iacc62d33ffccfc11694167ee2a7bc6aad82c1f2f --- doc/source/admin/tuning.rst | 7 +++ ironic/conductor/base_manager.py | 48 +++++++++++++---- ironic/conductor/manager.py | 5 +- ironic/conf/conductor.py | 6 +++ .../tests/unit/conductor/test_base_manager.py | 51 ++++++++++++++++--- ironic/tests/unit/conductor/test_manager.py | 9 ++++ .../reserved-workers-3cc0af8782b00fcc.yaml | 6 +++ 7 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 releasenotes/notes/reserved-workers-3cc0af8782b00fcc.yaml diff --git a/doc/source/admin/tuning.rst b/doc/source/admin/tuning.rst index d0e772f29d..662f168826 100644 --- a/doc/source/admin/tuning.rst +++ b/doc/source/admin/tuning.rst @@ -149,6 +149,13 @@ using more than one CPU core. If you use JSON RPC, you also need to make sure the ports don't conflict by setting the :oslo.config:option:`json_rpc.port` option. +Starting with the 2024.1 "Caracal" release cycle, a small proportion of the +threads (specified by the +:oslo.config:option:`conductor.reserved_workers_pool_percentage` option) is +reserved for API requests and other critical tasks. Periodic tasks and agent +heartbeats cannot use them. This ensures that the API stays responsive even +under extreme internal load. + .. _eventlet: https://eventlet.net/ Database diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py index c6f4b3273c..4cada13501 100644 --- a/ironic/conductor/base_manager.py +++ b/ironic/conductor/base_manager.py @@ -99,6 +99,29 @@ class BaseConductorManager(object): # clear all locks held by this conductor before registering self.dbapi.clear_node_reservations_for_conductor(self.host) + def _init_executors(self, total_workers, reserved_percentage): + # NOTE(dtantsur): do not allow queuing work. Given our model, it's + # better to reject an incoming request with HTTP 503 or reschedule + # a periodic task that end up with hidden backlog that is hard + # to track and debug. Using 1 instead of 0 because of how things are + # ordered in futurist (it checks for rejection first). + rejection_func = rejection.reject_when_reached(1) + + reserved_workers = int(total_workers * reserved_percentage / 100) + remaining = total_workers - reserved_workers + LOG.info("Starting workers pool: %d normal workers + %d reserved", + remaining, reserved_workers) + + self._executor = futurist.GreenThreadPoolExecutor( + max_workers=remaining, + check_and_reject=rejection_func) + if reserved_workers: + self._reserved_executor = futurist.GreenThreadPoolExecutor( + max_workers=reserved_workers, + check_and_reject=rejection_func) + else: + self._reserved_executor = None + def init_host(self, admin_context=None, start_consoles=True, start_allocations=True): """Initialize the conductor host. @@ -125,16 +148,8 @@ class BaseConductorManager(object): self._keepalive_evt = threading.Event() """Event for the keepalive thread.""" - # NOTE(dtantsur): do not allow queuing work. Given our model, it's - # better to reject an incoming request with HTTP 503 or reschedule - # a periodic task that end up with hidden backlog that is hard - # to track and debug. Using 1 instead of 0 because of how things are - # ordered in futurist (it checks for rejection first). - rejection_func = rejection.reject_when_reached(1) - self._executor = futurist.GreenThreadPoolExecutor( - max_workers=CONF.conductor.workers_pool_size, - check_and_reject=rejection_func) - """Executor for performing tasks async.""" + self._init_executors(CONF.conductor.workers_pool_size, + CONF.conductor.reserved_workers_pool_percentage) # TODO(jroll) delete the use_groups argument and use the default # in Stein. @@ -358,6 +373,8 @@ class BaseConductorManager(object): # having work complete normally. self._periodic_tasks.stop() self._periodic_tasks.wait() + if self._reserved_executor is not None: + self._reserved_executor.shutdown(wait=True) self._executor.shutdown(wait=True) if self._zeroconf is not None: @@ -453,7 +470,8 @@ class BaseConductorManager(object): if self._mapped_to_this_conductor(*result[:3]): yield result - def _spawn_worker(self, func, *args, **kwargs): + def _spawn_worker(self, func, *args, _allow_reserved_pool=True, + **kwargs): """Create a greenthread to run func(*args, **kwargs). @@ -466,6 +484,14 @@ class BaseConductorManager(object): """ try: return self._executor.submit(func, *args, **kwargs) + except futurist.RejectedSubmission: + if not _allow_reserved_pool or self._reserved_executor is None: + raise exception.NoFreeConductorWorker() + + LOG.debug('Normal workers pool is full, using reserved pool to run %s', + func.__qualname__) + try: + return self._reserved_executor.submit(func, *args, **kwargs) except futurist.RejectedSubmission: raise exception.NoFreeConductorWorker() diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index 5efa3f66af..3e53f15353 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -3209,7 +3209,10 @@ class ConductorManager(base_manager.BaseConductorManager): task.spawn_after( self._spawn_worker, task.driver.deploy.heartbeat, task, callback_url, agent_version, agent_verify_ca, - agent_status, agent_status_message) + agent_status, agent_status_message, + # NOTE(dtantsur): heartbeats are not that critical to allow + # them to potentially overload the conductor. + _allow_reserved_pool=False) @METRICS.timer('ConductorManager.vif_list') @messaging.expected_exceptions(exception.NetworkError, diff --git a/ironic/conf/conductor.py b/ironic/conf/conductor.py index 3c6261536f..01f385ba6f 100644 --- a/ironic/conf/conductor.py +++ b/ironic/conf/conductor.py @@ -28,6 +28,12 @@ opts = [ 'itself for handling heart beats and periodic tasks. ' 'On top of that, `sync_power_state_workers` will take ' 'up to 7 green threads with the default value of 8.')), + cfg.IntOpt('reserved_workers_pool_percentage', + default=5, min=0, max=50, + help=_('The percentage of the whole workers pool that will be ' + 'kept for API requests and other important tasks. ' + 'This part of the pool will not be used for periodic ' + 'tasks or agent heartbeats. Set to 0 to disable.')), cfg.IntOpt('heartbeat_interval', default=10, help=_('Seconds between conductor heart beats.')), diff --git a/ironic/tests/unit/conductor/test_base_manager.py b/ironic/tests/unit/conductor/test_base_manager.py index b8b60b01ba..eae61eaf4a 100644 --- a/ironic/tests/unit/conductor/test_base_manager.py +++ b/ironic/tests/unit/conductor/test_base_manager.py @@ -402,20 +402,55 @@ class ManagerSpawnWorkerTestCase(tests_base.TestCase): def setUp(self): super(ManagerSpawnWorkerTestCase, self).setUp() self.service = manager.ConductorManager('hostname', 'test-topic') - self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor) - self.service._executor = self.executor + self.service._executor = mock.Mock( + spec=futurist.GreenThreadPoolExecutor) + self.service._reserved_executor = mock.Mock( + spec=futurist.GreenThreadPoolExecutor) + + self.func = lambda: None def test__spawn_worker(self): - self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow') + self.service._spawn_worker(self.func, 1, 2, foo='bar', cat='meow') - self.executor.submit.assert_called_once_with( - 'fake', 1, 2, foo='bar', cat='meow') + self.service._executor.submit.assert_called_once_with( + self.func, 1, 2, foo='bar', cat='meow') + self.service._reserved_executor.submit.assert_not_called() - def test__spawn_worker_none_free(self): - self.executor.submit.side_effect = futurist.RejectedSubmission() + def test__spawn_worker_reserved(self): + self.service._executor.submit.side_effect = \ + futurist.RejectedSubmission() + + self.service._spawn_worker(self.func, 1, 2, foo='bar', cat='meow') + + self.service._executor.submit.assert_called_once_with( + self.func, 1, 2, foo='bar', cat='meow') + self.service._reserved_executor.submit.assert_called_once_with( + self.func, 1, 2, foo='bar', cat='meow') + + def test__spawn_worker_cannot_use_reserved(self): + self.service._executor.submit.side_effect = \ + futurist.RejectedSubmission() self.assertRaises(exception.NoFreeConductorWorker, - self.service._spawn_worker, 'fake') + self.service._spawn_worker, self.func, + _allow_reserved_pool=False) + + def test__spawn_worker_no_reserved(self): + self.service._executor.submit.side_effect = \ + futurist.RejectedSubmission() + self.service._reserved_executor = None + + self.assertRaises(exception.NoFreeConductorWorker, + self.service._spawn_worker, self.func) + + def test__spawn_worker_none_free(self): + self.service._executor.submit.side_effect = \ + futurist.RejectedSubmission() + self.service._reserved_executor.submit.side_effect = \ + futurist.RejectedSubmission() + + self.assertRaises(exception.NoFreeConductorWorker, + self.service._spawn_worker, self.func) @mock.patch.object(objects.Conductor, 'unregister_all_hardware_interfaces', diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index c08137a4ab..4b7c66ba07 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -7609,6 +7609,15 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): self.assertEqual(states.NOSTATE, node.target_provision_state) self.assertIsNone(node.last_error) + +@mgr_utils.mock_record_keepalive +class HeartbeatTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): + + def _fake_spawn(self, conductor_obj, func, *args, **kwargs): + self.assertFalse(kwargs.pop('_allow_reserved_pool')) + func(*args, **kwargs) + return mock.MagicMock() + # TODO(TheJulia): We should double check if these heartbeat tests need # to move. I have this strange feeling we were lacking rpc testing of # heartbeat until we did adoption testing.... diff --git a/releasenotes/notes/reserved-workers-3cc0af8782b00fcc.yaml b/releasenotes/notes/reserved-workers-3cc0af8782b00fcc.yaml new file mode 100644 index 0000000000..f1466e6b27 --- /dev/null +++ b/releasenotes/notes/reserved-workers-3cc0af8782b00fcc.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + Each conductor now reserves a small proportion of its worker threads (5% + by default) for API requests and other critical tasks. This ensures that + the API stays responsive even under extreme internal load.