Modify api and rpc default number of workers

- Limit number of api workers to roughly using half of system
  RAM. Spawning a bunch, just to have the OOM killer nuke them
  regularly is not useful.
- Bump the rpc_workers default to half of the api_workers.
  A default of 1 falls behind on any reasonably sized node.

Change-Id: I8b84a359f83133014b3d4414aafc10e6b7c6a876
Closes-bug: #1815629
This commit is contained in:
Doug Wiegley 2019-02-12 08:47:19 -07:00
parent 418e3f398b
commit 7e09b25b96
No known key found for this signature in database
GPG Key ID: 4D3C112B76BBDB5F
9 changed files with 156 additions and 27 deletions

View File

@ -127,3 +127,25 @@ serve this job:
# /usr/bin/neutron-rpc-server --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugins/ml2/ml2_conf.ini # /usr/bin/neutron-rpc-server --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugins/ml2/ml2_conf.ini
.. end .. end
Neutron Worker Processes
------------------------
Neutron will attempt to spawn a number of child processes for handling API
and RPC requests. The number of API workers is set to the number of CPU
cores, further limited by available memory, and the number of RPC workers
is set to half that number.
It is strongly recommended that all deployers set these values themselves,
via the api_workers and rpc_workers configuration parameters.
For a cloud with a high load to a relatively small number of objects,
a smaller value for api_workers will provide better performance than
many (somewhere around 4-8.) For a cloud with a high load to lots of
different objects, then the more the better. Budget neutron-server
using about 2GB of RAM in steady-state.
For rpc_workers, there needs to be enough to keep up with incoming
events from the various neutron agents. Signs that there are too few
can be agent heartbeats arriving late, nova vif bindings timing out
on the hypervisors, or rpc message timeout exceptions in agent logs.

View File

@ -18,6 +18,7 @@ from oslo_log import log as logging
from oslo_upgradecheck import upgradecheck from oslo_upgradecheck import upgradecheck
from neutron.conf import common as neutron_conf_base from neutron.conf import common as neutron_conf_base
from neutron.conf import service as neutron_conf_service
CHECKS_ENTRYPOINTS = 'neutron.status.upgrade.checks' CHECKS_ENTRYPOINTS = 'neutron.status.upgrade.checks'
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -50,6 +51,8 @@ def setup_conf(conf=cfg.CONF):
""" """
neutron_conf_base.register_core_common_config_opts(conf) neutron_conf_base.register_core_common_config_opts(conf)
neutron_conf_service.register_service_opts(
neutron_conf_service.service_opts, cfg.CONF)
return conf return conf

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo_config import cfg
from oslo_upgradecheck import upgradecheck from oslo_upgradecheck import upgradecheck
from neutron._i18n import _ from neutron._i18n import _
@ -22,12 +23,19 @@ class CoreChecks(base.BaseChecks):
def get_checks(self): def get_checks(self):
return [ return [
(_("Check nothing"), self.noop_check) (_("Worker counts configured"), self.worker_count_check)
] ]
@staticmethod @staticmethod
def noop_check(checker): def worker_count_check(checker):
# NOTE(slaweq) This is only example Noop check, it can be removed when
# some real check methods will be added if cfg.CONF.api_workers and cfg.CONF.rpc_workers:
return upgradecheck.Result( return upgradecheck.Result(
upgradecheck.Code.SUCCESS, _("Always succeed (placeholder)")) upgradecheck.Code.SUCCESS, _("Number of workers already "
"defined in config"))
else:
return upgradecheck.Result(
upgradecheck.Code.WARNING, _("The default number of workers "
"has changed. Please see release notes for the new values, "
"but it is strongly encouraged for deployers to manually set "
"the values for api_workers and rpc_workers."))

View File

@ -26,10 +26,12 @@ service_opts = [
cfg.IntOpt('api_workers', cfg.IntOpt('api_workers',
help=_('Number of separate API worker processes for service. ' help=_('Number of separate API worker processes for service. '
'If not specified, the default is equal to the number ' 'If not specified, the default is equal to the number '
'of CPUs available for best performance.')), 'of CPUs available for best performance, capped by '
'potential RAM usage.')),
cfg.IntOpt('rpc_workers', cfg.IntOpt('rpc_workers',
default=1, help=_('Number of RPC worker processes for service. '
help=_('Number of RPC worker processes for service.')), 'If not specified, the default is equal to half the '
'number of API workers.')),
cfg.IntOpt('rpc_state_report_workers', cfg.IntOpt('rpc_state_report_workers',
default=1, default=1,
help=_('Number of RPC worker processes dedicated to state ' help=_('Number of RPC worker processes dedicated to state '

View File

@ -33,6 +33,7 @@ from oslo_service import loopingcall
from oslo_service import service as common_service from oslo_service import service as common_service
from oslo_utils import excutils from oslo_utils import excutils
from oslo_utils import importutils from oslo_utils import importutils
import psutil
from neutron.common import config from neutron.common import config
from neutron.common import profiler from neutron.common import profiler
@ -148,28 +149,51 @@ class RpcReportsWorker(RpcWorker):
start_listeners_method = 'start_rpc_state_reports_listener' start_listeners_method = 'start_rpc_state_reports_listener'
def _get_rpc_workers(): def _get_worker_count():
# Start with the number of CPUs
num_workers = processutils.get_worker_count()
# Now don't use more than half the system memory, assuming
# a steady-state bloat of around 2GB.
mem = psutil.virtual_memory()
mem_workers = int(mem.total / (2 * 1024 * 1024 * 1024))
if mem_workers < num_workers:
num_workers = mem_workers
# And just in case, always at least one.
if num_workers <= 0:
num_workers = 1
return num_workers
def _get_rpc_workers(plugin=None):
if plugin is None:
plugin = directory.get_plugin() plugin = directory.get_plugin()
service_plugins = directory.get_plugins().values() service_plugins = directory.get_plugins().values()
if cfg.CONF.rpc_workers < 1: workers = cfg.CONF.rpc_workers
cfg.CONF.set_override('rpc_workers', 1) if workers is None:
# By default, half as many rpc workers as api workers
workers = int(_get_worker_count() / 2)
if workers < 1:
workers = 1
# If 0 < rpc_workers then start_rpc_listeners would be called in a # If workers > 0 then start_rpc_listeners would be called in a
# subprocess and we cannot simply catch the NotImplementedError. It is # subprocess and we cannot simply catch the NotImplementedError. It is
# simpler to check this up front by testing whether the plugin supports # simpler to check this up front by testing whether the plugin supports
# multiple RPC workers. # multiple RPC workers.
if not plugin.rpc_workers_supported(): if not plugin.rpc_workers_supported():
LOG.debug("Active plugin doesn't implement start_rpc_listeners") LOG.debug("Active plugin doesn't implement start_rpc_listeners")
if 0 < cfg.CONF.rpc_workers: if workers > 0:
LOG.error("'rpc_workers = %d' ignored because " LOG.error("'rpc_workers = %d' ignored because "
"start_rpc_listeners is not implemented.", "start_rpc_listeners is not implemented.",
cfg.CONF.rpc_workers) workers)
raise NotImplementedError() raise NotImplementedError()
# passing service plugins only, because core plugin is among them # passing service plugins only, because core plugin is among them
rpc_workers = [RpcWorker(service_plugins, rpc_workers = [RpcWorker(service_plugins,
worker_process_count=cfg.CONF.rpc_workers)] worker_process_count=workers)]
if (cfg.CONF.rpc_state_report_workers > 0 and if (cfg.CONF.rpc_state_report_workers > 0 and
plugin.rpc_state_report_workers_supported()): plugin.rpc_state_report_workers_supported()):
@ -283,7 +307,7 @@ def start_plugins_workers():
def _get_api_workers(): def _get_api_workers():
workers = cfg.CONF.api_workers workers = cfg.CONF.api_workers
if workers is None: if workers is None:
workers = processutils.get_worker_count() workers = _get_worker_count()
return workers return workers

View File

@ -24,7 +24,8 @@ from neutron.tests.functional import test_server
class TestService(base.BaseLoggingTestCase): class TestService(base.BaseLoggingTestCase):
def test_api_workers_default(self): def test_api_workers_default(self):
self.assertEqual(processutils.get_worker_count(), # This value may end being scaled downward based on available RAM.
self.assertGreaterEqual(processutils.get_worker_count(),
neutron_service._get_api_workers()) neutron_service._get_api_workers())
def test_api_workers_from_config(self): def test_api_workers_from_config(self):

View File

@ -13,6 +13,7 @@
# under the License. # under the License.
import mock import mock
from oslo_config import cfg
from oslo_upgradecheck.upgradecheck import Code from oslo_upgradecheck.upgradecheck import Code
from neutron.cmd.upgrade_checks import checks from neutron.cmd.upgrade_checks import checks
@ -28,6 +29,26 @@ class TestChecks(base.BaseTestCase):
def test_get_checks_list(self): def test_get_checks_list(self):
self.assertIsInstance(self.checks.get_checks(), list) self.assertIsInstance(self.checks.get_checks(), list)
def test_noop_check(self): def test_worker_check_good(self):
check_result = checks.CoreChecks.noop_check(mock.Mock()) cfg.CONF.set_override("api_workers", 2)
self.assertEqual(Code.SUCCESS, check_result.code) cfg.CONF.set_override("rpc_workers", 2)
result = checks.CoreChecks.worker_count_check(mock.Mock())
self.assertEqual(Code.SUCCESS, result.code)
def test_worker_check_api_missing(self):
cfg.CONF.set_override("api_workers", None)
cfg.CONF.set_override("rpc_workers", 2)
result = checks.CoreChecks.worker_count_check(mock.Mock())
self.assertEqual(Code.WARNING, result.code)
def test_worker_check_rpc_missing(self):
cfg.CONF.set_override("api_workers", 2)
cfg.CONF.set_override("rpc_workers", None)
result = checks.CoreChecks.worker_count_check(mock.Mock())
self.assertEqual(Code.WARNING, result.code)
def test_worker_check_both_missing(self):
cfg.CONF.set_override("api_workers", None)
cfg.CONF.set_override("rpc_workers", None)
result = checks.CoreChecks.worker_count_check(mock.Mock())
self.assertEqual(Code.WARNING, result.code)

View File

@ -18,6 +18,7 @@ import mock
from neutron_lib.callbacks import events from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources from neutron_lib.callbacks import resources
from oslo_concurrency import processutils
from oslo_config import cfg from oslo_config import cfg
from neutron import service from neutron import service
@ -25,6 +26,14 @@ from neutron.tests import base
from neutron.tests.unit import test_wsgi from neutron.tests.unit import test_wsgi
class TestServiceHelpers(base.BaseTestCase):
def test_get_workers(self):
num_workers = service._get_worker_count()
self.assertGreaterEqual(num_workers, 1)
self.assertLessEqual(num_workers, processutils.get_worker_count())
class TestRpcWorker(test_wsgi.TestServiceBase): class TestRpcWorker(test_wsgi.TestServiceBase):
def test_reset(self): def test_reset(self):
@ -33,12 +42,36 @@ class TestRpcWorker(test_wsgi.TestServiceBase):
self._test_reset(rpc_worker) self._test_reset(rpc_worker)
class TestRunRpcWorkers(base.BaseTestCase):
def setUp(self):
super(TestRunRpcWorkers, self).setUp()
self.worker_count = service._get_worker_count()
def _test_rpc_workers(self, config_value, expected_passed_value):
if config_value is not None:
cfg.CONF.set_override('rpc_workers', config_value)
with mock.patch('neutron.service.RpcWorker') as mock_rpc_worker:
with mock.patch('neutron.service.RpcReportsWorker'):
service._get_rpc_workers(plugin=mock.Mock())
init_call = mock_rpc_worker.call_args
expected_call = mock.call(
mock.ANY, worker_process_count=expected_passed_value)
self.assertEqual(expected_call, init_call)
def test_rpc_workers_zero(self):
self._test_rpc_workers(0, 1)
def test_rpc_workers_default(self):
self._test_rpc_workers(None, int(self.worker_count / 2))
def test_rpc_workers_defined(self):
self._test_rpc_workers(42, 42)
class TestRunWsgiApp(base.BaseTestCase): class TestRunWsgiApp(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestRunWsgiApp, self).setUp() super(TestRunWsgiApp, self).setUp()
self.processor_count = mock.patch( self.worker_count = service._get_worker_count()
'oslo_concurrency.processutils.get_worker_count'
).start().return_value
def _test_api_workers(self, config_value, expected_passed_value): def _test_api_workers(self, config_value, expected_passed_value):
if config_value is not None: if config_value is not None:
@ -54,7 +87,7 @@ class TestRunWsgiApp(base.BaseTestCase):
self._test_api_workers(0, 0) self._test_api_workers(0, 0)
def test_api_workers_default(self): def test_api_workers_default(self):
self._test_api_workers(None, self.processor_count) self._test_api_workers(None, self.worker_count)
def test_api_workers_defined(self): def test_api_workers_defined(self):
self._test_api_workers(42, 42) self._test_api_workers(42, 42)

View File

@ -0,0 +1,15 @@
upgrade:
- The number of api and rpc workers may change on upgrade.
It is strongly recommended that all deployers set these
values in their neutron configurations, rather than
using the defaults.
fixes:
- Neutron API workers default to the number of CPU cores.
This can lead to high cpu/low memory boxes getting into
trouble. The defaults have been tweaked to attempt to
put an upper bound on the default of either the number
of cores, or half of system memory, whichever is lower.
In addition, the default number of RPC workers has been
changed from a value of ``1``, to a value of half the
number of API workers.