Merge "Add a reserved workers pool (5% by default)"
This commit is contained in:
commit
a7ce0d7ae6
@ -149,6 +149,13 @@ using more than one CPU core.
|
||||
If you use JSON RPC, you also need to make sure the ports don't conflict by
|
||||
setting the :oslo.config:option:`json_rpc.port` option.
|
||||
|
||||
Starting with the 2024.1 "Caracal" release cycle, a small proportion of the
|
||||
threads (specified by the
|
||||
:oslo.config:option:`conductor.reserved_workers_pool_percentage` option) is
|
||||
reserved for API requests and other critical tasks. Periodic tasks and agent
|
||||
heartbeats cannot use them. This ensures that the API stays responsive even
|
||||
under extreme internal load.
|
||||
|
||||
.. _eventlet: https://eventlet.net/
|
||||
|
||||
Database
|
||||
|
@ -99,6 +99,29 @@ class BaseConductorManager(object):
|
||||
# clear all locks held by this conductor before registering
|
||||
self.dbapi.clear_node_reservations_for_conductor(self.host)
|
||||
|
||||
def _init_executors(self, total_workers, reserved_percentage):
|
||||
# NOTE(dtantsur): do not allow queuing work. Given our model, it's
|
||||
# better to reject an incoming request with HTTP 503 or reschedule
|
||||
# a periodic task that end up with hidden backlog that is hard
|
||||
# to track and debug. Using 1 instead of 0 because of how things are
|
||||
# ordered in futurist (it checks for rejection first).
|
||||
rejection_func = rejection.reject_when_reached(1)
|
||||
|
||||
reserved_workers = int(total_workers * reserved_percentage / 100)
|
||||
remaining = total_workers - reserved_workers
|
||||
LOG.info("Starting workers pool: %d normal workers + %d reserved",
|
||||
remaining, reserved_workers)
|
||||
|
||||
self._executor = futurist.GreenThreadPoolExecutor(
|
||||
max_workers=remaining,
|
||||
check_and_reject=rejection_func)
|
||||
if reserved_workers:
|
||||
self._reserved_executor = futurist.GreenThreadPoolExecutor(
|
||||
max_workers=reserved_workers,
|
||||
check_and_reject=rejection_func)
|
||||
else:
|
||||
self._reserved_executor = None
|
||||
|
||||
def init_host(self, admin_context=None, start_consoles=True,
|
||||
start_allocations=True):
|
||||
"""Initialize the conductor host.
|
||||
@ -125,16 +148,8 @@ class BaseConductorManager(object):
|
||||
self._keepalive_evt = threading.Event()
|
||||
"""Event for the keepalive thread."""
|
||||
|
||||
# NOTE(dtantsur): do not allow queuing work. Given our model, it's
|
||||
# better to reject an incoming request with HTTP 503 or reschedule
|
||||
# a periodic task that end up with hidden backlog that is hard
|
||||
# to track and debug. Using 1 instead of 0 because of how things are
|
||||
# ordered in futurist (it checks for rejection first).
|
||||
rejection_func = rejection.reject_when_reached(1)
|
||||
self._executor = futurist.GreenThreadPoolExecutor(
|
||||
max_workers=CONF.conductor.workers_pool_size,
|
||||
check_and_reject=rejection_func)
|
||||
"""Executor for performing tasks async."""
|
||||
self._init_executors(CONF.conductor.workers_pool_size,
|
||||
CONF.conductor.reserved_workers_pool_percentage)
|
||||
|
||||
# TODO(jroll) delete the use_groups argument and use the default
|
||||
# in Stein.
|
||||
@ -358,6 +373,8 @@ class BaseConductorManager(object):
|
||||
# having work complete normally.
|
||||
self._periodic_tasks.stop()
|
||||
self._periodic_tasks.wait()
|
||||
if self._reserved_executor is not None:
|
||||
self._reserved_executor.shutdown(wait=True)
|
||||
self._executor.shutdown(wait=True)
|
||||
|
||||
if self._zeroconf is not None:
|
||||
@ -453,7 +470,8 @@ class BaseConductorManager(object):
|
||||
if self._mapped_to_this_conductor(*result[:3]):
|
||||
yield result
|
||||
|
||||
def _spawn_worker(self, func, *args, **kwargs):
|
||||
def _spawn_worker(self, func, *args, _allow_reserved_pool=True,
|
||||
**kwargs):
|
||||
|
||||
"""Create a greenthread to run func(*args, **kwargs).
|
||||
|
||||
@ -466,6 +484,14 @@ class BaseConductorManager(object):
|
||||
"""
|
||||
try:
|
||||
return self._executor.submit(func, *args, **kwargs)
|
||||
except futurist.RejectedSubmission:
|
||||
if not _allow_reserved_pool or self._reserved_executor is None:
|
||||
raise exception.NoFreeConductorWorker()
|
||||
|
||||
LOG.debug('Normal workers pool is full, using reserved pool to run %s',
|
||||
func.__qualname__)
|
||||
try:
|
||||
return self._reserved_executor.submit(func, *args, **kwargs)
|
||||
except futurist.RejectedSubmission:
|
||||
raise exception.NoFreeConductorWorker()
|
||||
|
||||
|
@ -3213,7 +3213,10 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
task.spawn_after(
|
||||
self._spawn_worker, task.driver.deploy.heartbeat,
|
||||
task, callback_url, agent_version, agent_verify_ca,
|
||||
agent_status, agent_status_message)
|
||||
agent_status, agent_status_message,
|
||||
# NOTE(dtantsur): heartbeats are not that critical to allow
|
||||
# them to potentially overload the conductor.
|
||||
_allow_reserved_pool=False)
|
||||
|
||||
@METRICS.timer('ConductorManager.vif_list')
|
||||
@messaging.expected_exceptions(exception.NetworkError,
|
||||
|
@ -28,6 +28,12 @@ opts = [
|
||||
'itself for handling heart beats and periodic tasks. '
|
||||
'On top of that, `sync_power_state_workers` will take '
|
||||
'up to 7 green threads with the default value of 8.')),
|
||||
cfg.IntOpt('reserved_workers_pool_percentage',
|
||||
default=5, min=0, max=50,
|
||||
help=_('The percentage of the whole workers pool that will be '
|
||||
'kept for API requests and other important tasks. '
|
||||
'This part of the pool will not be used for periodic '
|
||||
'tasks or agent heartbeats. Set to 0 to disable.')),
|
||||
cfg.IntOpt('heartbeat_interval',
|
||||
default=10,
|
||||
help=_('Seconds between conductor heart beats.')),
|
||||
|
@ -402,20 +402,55 @@ class ManagerSpawnWorkerTestCase(tests_base.TestCase):
|
||||
def setUp(self):
|
||||
super(ManagerSpawnWorkerTestCase, self).setUp()
|
||||
self.service = manager.ConductorManager('hostname', 'test-topic')
|
||||
self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor)
|
||||
self.service._executor = self.executor
|
||||
self.service._executor = mock.Mock(
|
||||
spec=futurist.GreenThreadPoolExecutor)
|
||||
self.service._reserved_executor = mock.Mock(
|
||||
spec=futurist.GreenThreadPoolExecutor)
|
||||
|
||||
self.func = lambda: None
|
||||
|
||||
def test__spawn_worker(self):
|
||||
self.service._spawn_worker('fake', 1, 2, foo='bar', cat='meow')
|
||||
self.service._spawn_worker(self.func, 1, 2, foo='bar', cat='meow')
|
||||
|
||||
self.executor.submit.assert_called_once_with(
|
||||
'fake', 1, 2, foo='bar', cat='meow')
|
||||
self.service._executor.submit.assert_called_once_with(
|
||||
self.func, 1, 2, foo='bar', cat='meow')
|
||||
self.service._reserved_executor.submit.assert_not_called()
|
||||
|
||||
def test__spawn_worker_none_free(self):
|
||||
self.executor.submit.side_effect = futurist.RejectedSubmission()
|
||||
def test__spawn_worker_reserved(self):
|
||||
self.service._executor.submit.side_effect = \
|
||||
futurist.RejectedSubmission()
|
||||
|
||||
self.service._spawn_worker(self.func, 1, 2, foo='bar', cat='meow')
|
||||
|
||||
self.service._executor.submit.assert_called_once_with(
|
||||
self.func, 1, 2, foo='bar', cat='meow')
|
||||
self.service._reserved_executor.submit.assert_called_once_with(
|
||||
self.func, 1, 2, foo='bar', cat='meow')
|
||||
|
||||
def test__spawn_worker_cannot_use_reserved(self):
|
||||
self.service._executor.submit.side_effect = \
|
||||
futurist.RejectedSubmission()
|
||||
|
||||
self.assertRaises(exception.NoFreeConductorWorker,
|
||||
self.service._spawn_worker, 'fake')
|
||||
self.service._spawn_worker, self.func,
|
||||
_allow_reserved_pool=False)
|
||||
|
||||
def test__spawn_worker_no_reserved(self):
|
||||
self.service._executor.submit.side_effect = \
|
||||
futurist.RejectedSubmission()
|
||||
self.service._reserved_executor = None
|
||||
|
||||
self.assertRaises(exception.NoFreeConductorWorker,
|
||||
self.service._spawn_worker, self.func)
|
||||
|
||||
def test__spawn_worker_none_free(self):
|
||||
self.service._executor.submit.side_effect = \
|
||||
futurist.RejectedSubmission()
|
||||
self.service._reserved_executor.submit.side_effect = \
|
||||
futurist.RejectedSubmission()
|
||||
|
||||
self.assertRaises(exception.NoFreeConductorWorker,
|
||||
self.service._spawn_worker, self.func)
|
||||
|
||||
|
||||
@mock.patch.object(objects.Conductor, 'unregister_all_hardware_interfaces',
|
||||
|
@ -7609,6 +7609,15 @@ class DoNodeAdoptionTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
self.assertEqual(states.NOSTATE, node.target_provision_state)
|
||||
self.assertIsNone(node.last_error)
|
||||
|
||||
|
||||
@mgr_utils.mock_record_keepalive
|
||||
class HeartbeatTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
||||
|
||||
def _fake_spawn(self, conductor_obj, func, *args, **kwargs):
|
||||
self.assertFalse(kwargs.pop('_allow_reserved_pool'))
|
||||
func(*args, **kwargs)
|
||||
return mock.MagicMock()
|
||||
|
||||
# TODO(TheJulia): We should double check if these heartbeat tests need
|
||||
# to move. I have this strange feeling we were lacking rpc testing of
|
||||
# heartbeat until we did adoption testing....
|
||||
|
@ -0,0 +1,6 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
Each conductor now reserves a small proportion of its worker threads (5%
|
||||
by default) for API requests and other critical tasks. This ensures that
|
||||
the API stays responsive even under extreme internal load.
|
Loading…
x
Reference in New Issue
Block a user