Parallelize periodic power sync calls

Node power sync is performed from a periodic task. In that task
all nodes are iterated over and power sync call is performed.
While the power sync call itself if non-blocking relative to
other concurrent I/O tasks, iteration over the nodes seems
sequential meaning that nodes power sync is performed one node
at a time.

If the above observation holds, large-scale settings may never
be able to power sync all their nodes properly, throttling at
walking all active the nodes in 60 second period.

This patch distributes power sync calls over a bunch of green
threads each working on a portion of the nodes to be taken care
of.

Change-Id: I80297c877d9a87d3bd8fc30d0ed65cd443f200b3
This commit is contained in:
Ilya Etingof 2019-01-18 19:53:51 +01:00
parent fa6a93b881
commit 7448603ab8
4 changed files with 128 additions and 7 deletions

View File

@ -1636,10 +1636,40 @@ class ConductorManager(base_manager.BaseConductorManager):
@periodics.periodic(spacing=CONF.conductor.sync_power_state_interval,
enabled=CONF.conductor.sync_power_state_interval > 0)
def _sync_power_states(self, context):
"""Periodic task to sync power states for the nodes.
"""Periodic task to sync power states for the nodes."""
filters = {'maintenance': False}
nodes = queue.Queue()
for node_info in self.iter_nodes(fields=['id'], filters=filters):
nodes.put(node_info)
Attempt to grab a lock and sync only if the following
conditions are met:
number_of_threads = min(CONF.conductor.sync_power_state_workers,
CONF.conductor.periodic_max_workers,
nodes.qsize())
futures = []
for thread_number in range(max(0, number_of_threads - 1)):
try:
futures.append(
self._spawn_worker(self._sync_power_state_nodes_task,
context, nodes))
except exception.NoFreeConductorWorker:
LOG.warning("There are no more conductor workers for "
"power sync task. %(workers)d workers have "
"been already spawned.",
{'workers': thread_number})
break
try:
self._sync_power_state_nodes_task(context, nodes)
finally:
waiters.wait_for_all(futures)
def _sync_power_state_nodes_task(self, context, nodes):
"""Invokes power state sync on nodes from synchronized queue.
Attempt to grab a lock and sync only if the following conditions
are met:
1) Node is mapped to this conductor.
2) Node is not in maintenance mode.
@ -1665,9 +1695,13 @@ class ConductorManager(base_manager.BaseConductorManager):
# (through to its DB API call) so that we can eliminate our call
# and first set of checks below.
filters = {'maintenance': False}
node_iter = self.iter_nodes(fields=['id'], filters=filters)
for (node_uuid, driver, conductor_group, node_id) in node_iter:
while not self._shutdown:
try:
(node_uuid, driver, conductor_group,
node_id) = nodes.get_nowait()
except queue.Empty:
break
try:
# NOTE(dtantsur): start with a shared lock, upgrade if needed
with task_manager.acquire(context, node_uuid,

View File

@ -24,7 +24,9 @@ opts = [
default=100, min=3,
help=_('The size of the workers greenthread pool. '
'Note that 2 threads will be reserved by the conductor '
'itself for handling heart beats and periodic tasks.')),
'itself for handling heart beats and periodic tasks. '
'On top of that, `sync_power_state_workers` will take '
'up to 7 green threads with the default value of 8.')),
cfg.IntOpt('heartbeat_interval',
default=10,
help=_('Seconds between conductor heart beats.')),
@ -74,6 +76,11 @@ opts = [
'number of times Ironic should try syncing the '
'hardware node power state with the node power state '
'in DB')),
cfg.IntOpt('sync_power_state_workers',
default=8, min=1,
help=_('The maximum number of workers that can be started '
'simultaneously to sync nodes power state from the '
'periodic task.')),
cfg.IntOpt('periodic_max_workers',
default=8,
help=_('Maximum number of worker threads that can be started '

View File

@ -22,6 +22,7 @@ from collections import namedtuple
import datetime
import eventlet
from futurist import waiters
import mock
from oslo_config import cfg
from oslo_db import exception as db_exception
@ -6313,6 +6314,10 @@ class ManagerDoSyncPowerStateTestCase(db_base.DbTestCase):
self.task.upgrade_lock.assert_called_once_with()
@mock.patch.object(waiters, 'wait_for_all',
new=mock.MagicMock(return_value=(0, 0)))
@mock.patch.object(manager.ConductorManager, '_spawn_worker',
new=lambda self, fun, *args: fun(*args))
@mock.patch.object(manager, 'do_sync_power_state')
@mock.patch.object(task_manager, 'acquire')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@ -7037,6 +7042,72 @@ class ManagerTestHardwareTypeProperties(mgr_utils.ServiceSetUpMixin,
self._check_hardware_type_properties('manual-management', expected)
@mock.patch.object(waiters, 'wait_for_all')
@mock.patch.object(manager.ConductorManager, '_spawn_worker')
@mock.patch.object(manager.ConductorManager, '_sync_power_state_nodes_task')
class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn):
def setUp(self):
super(ParallelPowerSyncTestCase, self).setUp()
self.service = manager.ConductorManager('hostname', 'test-topic')
def test__sync_power_states_9_nodes_8_workers(
self, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 9)):
self.service._sync_power_states(self.context)
self.assertEqual(7, spawn_mock.call_count)
self.assertEqual(1, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
def test__sync_power_states_6_nodes_8_workers(
self, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 6)):
self.service._sync_power_states(self.context)
self.assertEqual(5, spawn_mock.call_count)
self.assertEqual(1, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
def test__sync_power_states_1_nodes_8_workers(
self, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None])):
self.service._sync_power_states(self.context)
self.assertEqual(0, spawn_mock.call_count)
self.assertEqual(1, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
def test__sync_power_states_9_nodes_1_worker(
self, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 1, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 9)):
self.service._sync_power_states(self.context)
self.assertEqual(0, spawn_mock.call_count)
self.assertEqual(9, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
@mock.patch.object(task_manager, 'acquire')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')

View File

@ -0,0 +1,9 @@
---
features:
- |
Parallelizes periodic power sync calls by running up to
``sync_power_state_workers`` simultenously. The default is to run
up to ``8`` workers.
This change should let larger-scale setups to run power syncs more
frequently and make the whole power sync procedure more resilient to slow
or dead BMCs.