Prioritize sloppy nodes for power sync
When scheduling nodes for periodic power sync, first deal with the nodes that used to fail power syncing in the near past. The aim here is to fail non-responsive nodes fast. Change-Id: Id7f654369843c28b5edc213046f82c7bf15fed85
This commit is contained in:
parent
506cb12160
commit
0d798372e1
@ -1665,20 +1665,28 @@ class ConductorManager(base_manager.BaseConductorManager):
|
|||||||
def _sync_power_states(self, context):
|
def _sync_power_states(self, context):
|
||||||
"""Periodic task to sync power states for the nodes."""
|
"""Periodic task to sync power states for the nodes."""
|
||||||
filters = {'maintenance': False}
|
filters = {'maintenance': False}
|
||||||
nodes = queue.Queue()
|
|
||||||
for node_info in self.iter_nodes(fields=['id'], filters=filters):
|
# NOTE(etingof): prioritize non-responding nodes to fail them fast
|
||||||
nodes.put(node_info)
|
nodes = sorted(
|
||||||
|
self.iter_nodes(fields=['id'], filters=filters),
|
||||||
|
key=lambda n: -self.power_state_sync_count.get(n[0], 0)
|
||||||
|
)
|
||||||
|
|
||||||
|
nodes_queue = queue.Queue()
|
||||||
|
|
||||||
|
for node_info in nodes:
|
||||||
|
nodes_queue.put(node_info)
|
||||||
|
|
||||||
number_of_workers = min(CONF.conductor.sync_power_state_workers,
|
number_of_workers = min(CONF.conductor.sync_power_state_workers,
|
||||||
CONF.conductor.periodic_max_workers,
|
CONF.conductor.periodic_max_workers,
|
||||||
nodes.qsize())
|
nodes_queue.qsize())
|
||||||
futures = []
|
futures = []
|
||||||
|
|
||||||
for worker_number in range(max(0, number_of_workers - 1)):
|
for worker_number in range(max(0, number_of_workers - 1)):
|
||||||
try:
|
try:
|
||||||
futures.append(
|
futures.append(
|
||||||
self._spawn_worker(self._sync_power_state_nodes_task,
|
self._spawn_worker(self._sync_power_state_nodes_task,
|
||||||
context, nodes))
|
context, nodes_queue))
|
||||||
except exception.NoFreeConductorWorker:
|
except exception.NoFreeConductorWorker:
|
||||||
LOG.warning("There are no more conductor workers for "
|
LOG.warning("There are no more conductor workers for "
|
||||||
"power sync task. %(workers)d workers have "
|
"power sync task. %(workers)d workers have "
|
||||||
@ -1687,7 +1695,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
|||||||
break
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._sync_power_state_nodes_task(context, nodes)
|
self._sync_power_state_nodes_task(context, nodes_queue)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
waiters.wait_for_all(futures)
|
waiters.wait_for_all(futures)
|
||||||
|
@ -7160,7 +7160,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
|||||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||||
|
|
||||||
with mock.patch.object(self.service, 'iter_nodes',
|
with mock.patch.object(self.service, 'iter_nodes',
|
||||||
new=mock.MagicMock(return_value=[None] * 9)):
|
new=mock.MagicMock(return_value=[[0]] * 9)):
|
||||||
|
|
||||||
self.service._sync_power_states(self.context)
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
@ -7174,7 +7174,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
|||||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||||
|
|
||||||
with mock.patch.object(self.service, 'iter_nodes',
|
with mock.patch.object(self.service, 'iter_nodes',
|
||||||
new=mock.MagicMock(return_value=[None] * 6)):
|
new=mock.MagicMock(return_value=[[0]] * 6)):
|
||||||
|
|
||||||
self.service._sync_power_states(self.context)
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
@ -7188,7 +7188,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
|||||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||||
|
|
||||||
with mock.patch.object(self.service, 'iter_nodes',
|
with mock.patch.object(self.service, 'iter_nodes',
|
||||||
new=mock.MagicMock(return_value=[None])):
|
new=mock.MagicMock(return_value=[[0]])):
|
||||||
|
|
||||||
self.service._sync_power_states(self.context)
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
@ -7202,7 +7202,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
|||||||
CONF.set_override('sync_power_state_workers', 1, group='conductor')
|
CONF.set_override('sync_power_state_workers', 1, group='conductor')
|
||||||
|
|
||||||
with mock.patch.object(self.service, 'iter_nodes',
|
with mock.patch.object(self.service, 'iter_nodes',
|
||||||
new=mock.MagicMock(return_value=[None] * 9)):
|
new=mock.MagicMock(return_value=[[0]] * 9)):
|
||||||
|
|
||||||
self.service._sync_power_states(self.context)
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
@ -7210,6 +7210,26 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
|||||||
self.assertEqual(1, sync_mock.call_count)
|
self.assertEqual(1, sync_mock.call_count)
|
||||||
self.assertEqual(1, waiter_mock.call_count)
|
self.assertEqual(1, waiter_mock.call_count)
|
||||||
|
|
||||||
|
@mock.patch.object(queue, 'Queue', autospec=True)
|
||||||
|
def test__sync_power_states_node_prioritization(
|
||||||
|
self, queue_mock, sync_mock, spawn_mock, waiter_mock):
|
||||||
|
|
||||||
|
CONF.set_override('sync_power_state_workers', 1, group='conductor')
|
||||||
|
|
||||||
|
with mock.patch.object(
|
||||||
|
self.service, 'iter_nodes',
|
||||||
|
new=mock.MagicMock(return_value=[[0], [1], [2]])
|
||||||
|
), mock.patch.dict(
|
||||||
|
self.service.power_state_sync_count,
|
||||||
|
{0: 1, 1: 0, 2: 2}, clear=True):
|
||||||
|
|
||||||
|
queue_mock.return_value.qsize.return_value = 0
|
||||||
|
|
||||||
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
|
expected_calls = [mock.call([2]), mock.call([0]), mock.call([1])]
|
||||||
|
queue_mock.return_value.put.assert_has_calls(expected_calls)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(task_manager, 'acquire')
|
@mock.patch.object(task_manager, 'acquire')
|
||||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||||
|
Loading…
Reference in New Issue
Block a user