Merge "Concurrent Distructive/Intensive ops limits"

This commit is contained in:
Zuul 2022-09-21 16:38:35 +00:00 committed by Gerrit Code Review
commit eeeaa274cf
9 changed files with 287 additions and 4 deletions

View File

@ -973,3 +973,40 @@ Unfortunately, due to the way the conductor is designed, it is not possible to
gracefully break a stuck lock held in ``*-ing`` states. As the last resort, you gracefully break a stuck lock held in ``*-ing`` states. As the last resort, you
may need to restart the affected conductor. See `Why are my nodes stuck in a may need to restart the affected conductor. See `Why are my nodes stuck in a
"-ing" state?`_. "-ing" state?`_.
What is ConcurrentActionLimit?
==============================
ConcurrentActionLimit is an exception which is raised to clients when an
operation is requested, but cannot be serviced at that moment because the
overall threshold of nodes in concurrent "Deployment" or "Cleaning"
operations has been reached.
These limits exist for two distinct reasons.
The first is they allow an operator to tune a deployment such that too many
concurrent deployments cannot be triggered at any given time, as a single
conductor has an internal limit to the number of overall concurrent tasks,
this restricts only the number of running concurrent actions. As such, this
accounts for the number of nodes in ``deploy`` and ``deploy wait`` states.
In the case of deployments, the default value is relatively high and should
be suitable for *most* larger operators.
The second is to help slow down the ability in which an entire population of
baremetal nodes can be moved into and through cleaning, in order to help
guard against authenticated malicious users, or accidental script driven
operations. In this case, the total number of nodes in ``deleting``,
``cleaning``, and ``clean wait`` are evaluated. The default maximum limit
for cleaning operations is *50* and should be suitable for the majority of
baremetal operators.
These settings can be modified by using the
``[conductor]max_concurrent_deploy`` and ``[conductor]max_concurrent_clean``
settings from the ironic.conf file supporting the ``ironic-conductor``
service. Neither setting can be explicity disabled, however there is also no
upper limit to the setting.
.. note::
This was an infrastructure operator requested feature from actual lessons
learned in the operation of Ironic in large scale production. The defaults
may not be suitable for the largest scale operators.

View File

@ -851,3 +851,13 @@ class ImageRefIsARedirect(IronicException):
message=msg, message=msg,
image_ref=image_ref, image_ref=image_ref,
redirect_url=redirect_url) redirect_url=redirect_url)
class ConcurrentActionLimit(IronicException):
# NOTE(TheJulia): We explicitly don't report the concurrent
# action limit configuration value as a security guard since
# if informed of the limit, an attacker can tailor their attack.
_msg_fmt = _("Unable to process request at this time. "
"The concurrent action limit for %(task_type)s "
"has been reached. Please contact your administrator "
"and try again later.")

View File

@ -886,7 +886,8 @@ class ConductorManager(base_manager.BaseConductorManager):
exception.NodeInMaintenance, exception.NodeInMaintenance,
exception.InstanceDeployFailure, exception.InstanceDeployFailure,
exception.InvalidStateRequested, exception.InvalidStateRequested,
exception.NodeProtected) exception.NodeProtected,
exception.ConcurrentActionLimit)
def do_node_deploy(self, context, node_id, rebuild=False, def do_node_deploy(self, context, node_id, rebuild=False,
configdrive=None, deploy_steps=None): configdrive=None, deploy_steps=None):
"""RPC method to initiate deployment to a node. """RPC method to initiate deployment to a node.
@ -910,8 +911,11 @@ class ConductorManager(base_manager.BaseConductorManager):
:raises: InvalidStateRequested when the requested state is not a valid :raises: InvalidStateRequested when the requested state is not a valid
target from the current state. target from the current state.
:raises: NodeProtected if the node is protected. :raises: NodeProtected if the node is protected.
:raises: ConcurrentActionLimit if this action would exceed the maximum
number of configured concurrent actions of this type.
""" """
LOG.debug("RPC do_node_deploy called for node %s.", node_id) LOG.debug("RPC do_node_deploy called for node %s.", node_id)
self._concurrent_action_limit(action='provisioning')
event = 'rebuild' if rebuild else 'deploy' event = 'rebuild' if rebuild else 'deploy'
# NOTE(comstud): If the _sync_power_states() periodic task happens # NOTE(comstud): If the _sync_power_states() periodic task happens
@ -983,7 +987,8 @@ class ConductorManager(base_manager.BaseConductorManager):
exception.NodeLocked, exception.NodeLocked,
exception.InstanceDeployFailure, exception.InstanceDeployFailure,
exception.InvalidStateRequested, exception.InvalidStateRequested,
exception.NodeProtected) exception.NodeProtected,
exception.ConcurrentActionLimit)
def do_node_tear_down(self, context, node_id): def do_node_tear_down(self, context, node_id):
"""RPC method to tear down an existing node deployment. """RPC method to tear down an existing node deployment.
@ -998,8 +1003,11 @@ class ConductorManager(base_manager.BaseConductorManager):
:raises: InvalidStateRequested when the requested state is not a valid :raises: InvalidStateRequested when the requested state is not a valid
target from the current state. target from the current state.
:raises: NodeProtected if the node is protected. :raises: NodeProtected if the node is protected.
:raises: ConcurrentActionLimit if this action would exceed the maximum
number of configured concurrent actions of this type.
""" """
LOG.debug("RPC do_node_tear_down called for node %s.", node_id) LOG.debug("RPC do_node_tear_down called for node %s.", node_id)
self._concurrent_action_limit(action='unprovisioning')
with task_manager.acquire(context, node_id, shared=False, with task_manager.acquire(context, node_id, shared=False,
purpose='node tear down') as task: purpose='node tear down') as task:
@ -1121,7 +1129,8 @@ class ConductorManager(base_manager.BaseConductorManager):
exception.InvalidStateRequested, exception.InvalidStateRequested,
exception.NodeInMaintenance, exception.NodeInMaintenance,
exception.NodeLocked, exception.NodeLocked,
exception.NoFreeConductorWorker) exception.NoFreeConductorWorker,
exception.ConcurrentActionLimit)
def do_node_clean(self, context, node_id, clean_steps, def do_node_clean(self, context, node_id, clean_steps,
disable_ramdisk=False): disable_ramdisk=False):
"""RPC method to initiate manual cleaning. """RPC method to initiate manual cleaning.
@ -1150,7 +1159,10 @@ class ConductorManager(base_manager.BaseConductorManager):
:raises: NodeLocked if node is locked by another conductor. :raises: NodeLocked if node is locked by another conductor.
:raises: NoFreeConductorWorker when there is no free worker to start :raises: NoFreeConductorWorker when there is no free worker to start
async task. async task.
:raises: ConcurrentActionLimit If this action would exceed the
configured limits of the deployment.
""" """
self._concurrent_action_limit(action='cleaning')
with task_manager.acquire(context, node_id, shared=False, with task_manager.acquire(context, node_id, shared=False,
purpose='node manual cleaning') as task: purpose='node manual cleaning') as task:
node = task.node node = task.node
@ -3549,6 +3561,40 @@ class ConductorManager(base_manager.BaseConductorManager):
# impact DB access if done in excess. # impact DB access if done in excess.
eventlet.sleep(0) eventlet.sleep(0)
def _concurrent_action_limit(self, action):
"""Check Concurrency limits and block operations if needed.
This method is used to serve as a central place for the logic
for checks on concurrency limits. If a limit is reached, then
an appropriate exception is raised.
:raises: ConcurrentActionLimit If the system configuration
is exceeded.
"""
# NOTE(TheJulia): Keeping this all in one place for simplicity.
if action == 'provisioning':
node_count = self.dbapi.count_nodes_in_provision_state([
states.DEPLOYING,
states.DEPLOYWAIT
])
if node_count >= CONF.conductor.max_concurrent_deploy:
raise exception.ConcurrentActionLimit(
task_type=action)
if action == 'unprovisioning' or action == 'cleaning':
# NOTE(TheJulia): This also checks for the deleting state
# which is super transitory, *but* you can get a node into
# the state. So in order to guard against a DoS attack, we
# need to check even the super transitory node state.
node_count = self.dbapi.count_nodes_in_provision_state([
states.DELETING,
states.CLEANING,
states.CLEANWAIT
])
if node_count >= CONF.conductor.max_concurrent_clean:
raise exception.ConcurrentActionLimit(
task_type=action)
@METRICS.timer('get_vendor_passthru_metadata') @METRICS.timer('get_vendor_passthru_metadata')
def get_vendor_passthru_metadata(route_dict): def get_vendor_passthru_metadata(route_dict):

View File

@ -358,6 +358,32 @@ opts = [
'model. The conductor does *not* record this value ' 'model. The conductor does *not* record this value '
'otherwise, and this information is not backfilled ' 'otherwise, and this information is not backfilled '
'for prior instances which have been deployed.')), 'for prior instances which have been deployed.')),
cfg.IntOpt('max_concurrent_deploy',
default=250,
min=1,
mutable=True,
help=_('The maximum number of concurrent nodes in deployment '
'which are permitted in this Ironic system. '
'If this limit is reached, new requests will be '
'rejected until the number of deployments in progress '
'is lower than this maximum. As this is a security '
'mechanism requests are not queued, and this setting '
'is a global setting applying to all requests this '
'conductor receives, regardless of access rights. '
'The concurrent deployment limit cannot be disabled.')),
cfg.IntOpt('max_concurrent_clean',
default=50,
min=1,
mutable=True,
help=_('The maximum number of concurrent nodes in cleaning '
'which are permitted in this Ironic system. '
'If this limit is reached, new requests will be '
'rejected until the number of nodes in cleaning '
'is lower than this maximum. As this is a security '
'mechanism requests are not queued, and this setting '
'is a global setting applying to all requests this '
'conductor receives, regardless of access rights. '
'The concurrent clean limit cannot be disabled.')),
] ]

View File

@ -1416,3 +1416,12 @@ class Connection(object, metaclass=abc.ABCMeta):
:param entires: A list of node history entriy id's to be :param entires: A list of node history entriy id's to be
queried for deletion. queried for deletion.
""" """
@abc.abstractmethod
def count_nodes_in_provision_state(self, state):
"""Count the number of nodes in given provision state.
:param state: A provision_state value to match for the
count operation. This can be a single provision
state value or a list of values.
"""

View File

@ -30,6 +30,7 @@ from oslo_utils import timeutils
from oslo_utils import uuidutils from oslo_utils import uuidutils
from osprofiler import sqlalchemy as osp_sqlalchemy from osprofiler import sqlalchemy as osp_sqlalchemy
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import or_
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
from sqlalchemy.orm import Load from sqlalchemy.orm import Load
@ -2400,3 +2401,26 @@ class Connection(api.Connection):
).filter( ).filter(
models.NodeHistory.id.in_(entries) models.NodeHistory.id.in_(entries)
).delete(synchronize_session=False) ).delete(synchronize_session=False)
def count_nodes_in_provision_state(self, state):
if not isinstance(state, list):
state = [state]
with _session_for_read() as session:
# Intentionally does not use the full ORM model
# because that is de-duped by pkey, but we already
# have unique constraints on UUID/name, so... shouldn't
# be a big deal. #JuliaFamousLastWords.
# Anyway, intent here is to be as quick as possible and
# literally have the DB do *all* of the world, so no
# client side ops occur. The column is also indexed,
# which means this will be an index based response.
# TODO(TheJulia): This might need to be revised for
# SQLAlchemy 2.0 as it should be a scaler select and count
# instead.
return session.query(
models.Node.provision_state
).filter(
or_(
models.Node.provision_state == v for v in state
)
).count()

View File

@ -1829,6 +1829,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
def test_do_node_deploy_maintenance(self, mock_iwdi): def test_do_node_deploy_maintenance(self, mock_iwdi):
mock_iwdi.return_value = False mock_iwdi.return_value = False
self._start_service()
node = obj_utils.create_test_node(self.context, driver='fake-hardware', node = obj_utils.create_test_node(self.context, driver='fake-hardware',
maintenance=True) maintenance=True)
exc = self.assertRaises(messaging.rpc.ExpectedException, exc = self.assertRaises(messaging.rpc.ExpectedException,
@ -1843,6 +1844,7 @@ class ServiceDoNodeDeployTestCase(mgr_utils.ServiceSetUpMixin,
self.assertFalse(mock_iwdi.called) self.assertFalse(mock_iwdi.called)
def _test_do_node_deploy_validate_fail(self, mock_validate, mock_iwdi): def _test_do_node_deploy_validate_fail(self, mock_validate, mock_iwdi):
self._start_service()
mock_iwdi.return_value = False mock_iwdi.return_value = False
# InvalidParameterValue should be re-raised as InstanceDeployFailure # InvalidParameterValue should be re-raised as InstanceDeployFailure
mock_validate.side_effect = exception.InvalidParameterValue('error') mock_validate.side_effect = exception.InvalidParameterValue('error')
@ -2389,6 +2391,7 @@ class DoNodeTearDownTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
@mock.patch('ironic.drivers.modules.fake.FakePower.validate', @mock.patch('ironic.drivers.modules.fake.FakePower.validate',
autospec=True) autospec=True)
def test_do_node_tear_down_validate_fail(self, mock_validate): def test_do_node_tear_down_validate_fail(self, mock_validate):
self._start_service()
# InvalidParameterValue should be re-raised as InstanceDeployFailure # InvalidParameterValue should be re-raised as InstanceDeployFailure
mock_validate.side_effect = exception.InvalidParameterValue('error') mock_validate.side_effect = exception.InvalidParameterValue('error')
node = obj_utils.create_test_node( node = obj_utils.create_test_node(
@ -8374,7 +8377,6 @@ class NodeHistoryRecordCleanupTestCase(mgr_utils.ServiceSetUpMixin,
# 9 retained due to days, 3 to config # 9 retained due to days, 3 to config
self.service._manage_node_history(self.context) self.service._manage_node_history(self.context)
events = objects.NodeHistory.list(self.context) events = objects.NodeHistory.list(self.context)
print(events)
self.assertEqual(12, len(events)) self.assertEqual(12, len(events))
events = objects.NodeHistory.list_by_node_id(self.context, 10) events = objects.NodeHistory.list_by_node_id(self.context, 10)
self.assertEqual(4, len(events)) self.assertEqual(4, len(events))
@ -8394,3 +8396,73 @@ class NodeHistoryRecordCleanupTestCase(mgr_utils.ServiceSetUpMixin,
self.assertEqual('one', events[1].event) self.assertEqual('one', events[1].event)
self.assertEqual('two', events[2].event) self.assertEqual('two', events[2].event)
self.assertEqual('three', events[3].event) self.assertEqual('three', events[3].event)
class ConcurrentActionLimitTestCase(mgr_utils.ServiceSetUpMixin,
db_base.DbTestCase):
def setUp(self):
super(ConcurrentActionLimitTestCase, self).setUp()
self._start_service()
self.node1 = obj_utils.get_test_node(
self.context,
driver='fake-hardware',
id=110,
uuid=uuidutils.generate_uuid())
self.node2 = obj_utils.get_test_node(
self.context,
driver='fake-hardware',
id=111,
uuid=uuidutils.generate_uuid())
self.node3 = obj_utils.get_test_node(
self.context,
driver='fake-hardware',
id=112,
uuid=uuidutils.generate_uuid())
self.node4 = obj_utils.get_test_node(
self.context,
driver='fake-hardware',
id=113,
uuid=uuidutils.generate_uuid())
# Create the nodes, as the tasks need to operate across tables.
self.node1.create()
self.node2.create()
self.node3.create()
self.node4.create()
def test_concurrent_action_limit_deploy(self):
self.node1.provision_state = states.DEPLOYING
self.node2.provision_state = states.DEPLOYWAIT
self.node1.save()
self.node2.save()
CONF.set_override('max_concurrent_deploy', 2, group='conductor')
self.assertRaises(
exception.ConcurrentActionLimit,
self.service._concurrent_action_limit,
'provisioning')
self.service._concurrent_action_limit('unprovisioning')
self.service._concurrent_action_limit('cleaning')
CONF.set_override('max_concurrent_deploy', 3, group='conductor')
self.service._concurrent_action_limit('provisioning')
def test_concurrent_action_limit_cleaning(self):
self.node1.provision_state = states.DELETING
self.node2.provision_state = states.CLEANING
self.node3.provision_state = states.CLEANWAIT
self.node1.save()
self.node2.save()
self.node3.save()
CONF.set_override('max_concurrent_clean', 3, group='conductor')
self.assertRaises(
exception.ConcurrentActionLimit,
self.service._concurrent_action_limit,
'cleaning')
self.assertRaises(
exception.ConcurrentActionLimit,
self.service._concurrent_action_limit,
'unprovisioning')
self.service._concurrent_action_limit('provisioning')
CONF.set_override('max_concurrent_clean', 4, group='conductor')
self.service._concurrent_action_limit('cleaning')
self.service._concurrent_action_limit('unprovisioning')

View File

@ -1081,3 +1081,39 @@ class DbNodeTestCase(base.DbTestCase):
self.dbapi.check_node_list, self.dbapi.check_node_list,
[node1.uuid, 'this/cannot/be/a/name']) [node1.uuid, 'this/cannot/be/a/name'])
self.assertIn('this/cannot/be/a/name', str(exc)) self.assertIn('this/cannot/be/a/name', str(exc))
def test_node_provision_state_count(self):
active_nodes = 5
manageable_nodes = 3
deploywait_nodes = 1
for i in range(0, active_nodes):
utils.create_test_node(uuid=uuidutils.generate_uuid(),
provision_state=states.ACTIVE)
for i in range(0, manageable_nodes):
utils.create_test_node(uuid=uuidutils.generate_uuid(),
provision_state=states.MANAGEABLE)
for i in range(0, deploywait_nodes):
utils.create_test_node(uuid=uuidutils.generate_uuid(),
provision_state=states.DEPLOYWAIT)
self.assertEqual(
active_nodes,
self.dbapi.count_nodes_in_provision_state(states.ACTIVE)
)
self.assertEqual(
manageable_nodes,
self.dbapi.count_nodes_in_provision_state(states.MANAGEABLE)
)
self.assertEqual(
deploywait_nodes,
self.dbapi.count_nodes_in_provision_state(states.DEPLOYWAIT)
)
total = active_nodes + manageable_nodes + deploywait_nodes
self.assertEqual(
total,
self.dbapi.count_nodes_in_provision_state([
states.ACTIVE,
states.MANAGEABLE,
states.DEPLOYWAIT
])
)

View File

@ -0,0 +1,23 @@
---
features:
- |
Adds a concurrency limiter for number of nodes in states related to
*Cleaning* and *Provisioning* operations across the ironic deployment.
These settings default to a maximum number of concurrent deployments to
``250`` and a maximum number of concurrent deletes and cleaning operations
to ``50``. These settings can be tuned using
``[conductor]max_concurrent_deploy`` and
``[conductor]max_concurrent_clean``, respectively.
The defaults should generally be good for most operators in most cases.
Large scale operators should evaluate the defaults and tune appropriately
as this feature cannot be disabled, as it is a security mechanism.
upgrade:
- |
Large scale operators should be aware that a new feature, referred to as
"Concurrent Action Limit" was introduced as a security mechanism to
provide a means to limit attackers, or faulty scripts, from potentially
causing irreperable harm to an environment. This feature cannot be
disabled, and operators are encouraged to tune the new settings
``[conductor]max_concurrent_deploy`` and
``[conductor]max_concurrent_clean`` to match the needs of their
environment.