From c1b2c362f085be5f16bd6567d408892fa0205396 Mon Sep 17 00:00:00 2001 From: Koichiro Den Date: Tue, 13 Apr 2021 10:38:10 +0900 Subject: [PATCH] Fix two-stage process launcher in tacker-server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tacker-server has got two-stage process launcher since a long time ago. While it doesn't matter if {api_workers} is set to one or unset, which is the case by default on installation via devstack, it should have resulted in an odd situation in other cases as follows: 1). surprisingly the number of workers was not {api_workers} but {api_workers}*{api_workers}. Notice that the launcher processes are also present so the resulting number of tacker-server processes was 1+{api_workers}*({api_workers}+1). E.g., when {api_workers}=100, the number of tacker-server processes is 10101. 2). GreenPool is per the second stage launcher. That means {api_workers} number of workers, each of which is a green thread on a seperate process (spawned by ProcessLauncher), share the same pool. It means that the second stage launcher does not only fail to provide performance scaling, but also more likely to unnecessarily keep some workers starved due to a process which starts sleeping before yielding. This patch fixes it so that there is just a one-stage launcher. [Before this patch] master ├── worker#0 # second-stage launcher   │   ├── worker#0-0 (*1)   │   ├── worker#0-1 (*1)   │ : │   └── worker#0-{api_workers-1} (*1) ├── worker#1 # second-stage launcher : └── worker#{api_workers-1}   :    └── worker#{api_workers-1}-{api_workers-1} (*1) [After this patch] master ├── worker#0 (*1) ├── worker#1 (*1) : └── worker#{api_workers-1} (*1) (*1) spawns one green thread at the start. Closes-Bug: #1923528 Change-Id: I2bf567a0367659c170af0b618374a51d0f13eece --- tacker/service.py | 3 +- tacker/tests/unit/test_wsgi.py | 16 --------- tacker/wsgi.py | 61 ++++------------------------------ 3 files changed, 8 insertions(+), 72 deletions(-) diff --git a/tacker/service.py b/tacker/service.py index 23632606c..d5d13e260 100644 --- a/tacker/service.py +++ b/tacker/service.py @@ -119,8 +119,7 @@ def _run_wsgi(app_name): LOG.error('No known API applications configured.') return server = wsgi.Server("Tacker") - server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host, - workers=cfg.CONF.api_workers) + server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host) # Dump all option values here after all options are parsed cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) LOG.info("Tacker service started, listening on %(host)s:%(port)s", diff --git a/tacker/tests/unit/test_wsgi.py b/tacker/tests/unit/test_wsgi.py index 568a1cd37..99bb3be61 100644 --- a/tacker/tests/unit/test_wsgi.py +++ b/tacker/tests/unit/test_wsgi.py @@ -44,22 +44,6 @@ class TestWSGIServer(base.BaseTestCase): server.stop() server.wait() - @mock.patch('oslo_service.service.ProcessLauncher') - def test_start_multiple_workers(self, ProcessLauncher): - launcher = ProcessLauncher.return_value - - server = wsgi.Server("test_multiple_processes") - server.start(None, 0, host="127.0.0.1", workers=2) - launcher.running = True - launcher.launch_service.assert_called_once_with(server._server, - workers=2) - - server.stop() - self.assertFalse(launcher.running) - - server.wait() - launcher.wait.assert_called_once_with() - def test_start_random_port_with_ipv6(self): server = wsgi.Server("test_random_port") server.start(None, 0, host="::1") diff --git a/tacker/wsgi.py b/tacker/wsgi.py index 7d64f50ea..4dd6a9f66 100644 --- a/tacker/wsgi.py +++ b/tacker/wsgi.py @@ -33,7 +33,6 @@ import tacker.conf import oslo_i18n as i18n from oslo_log import log as logging from oslo_serialization import jsonutils -from oslo_service import service as common_service from oslo_service import systemd from oslo_utils import encodeutils from oslo_utils import excutils @@ -44,7 +43,6 @@ import webob.exc from tacker._i18n import _ from tacker.common import exceptions as exception from tacker import context -from tacker.db import api socket_opts = [ @@ -149,35 +147,6 @@ def expected_errors(errors): return decorator -class WorkerService(common_service.ServiceBase): - """Wraps a worker to be handled by ProcessLauncher.""" - - def __init__(self, service, application): - self._service = service - self._application = application - self._server = None - - def start(self): - # We may have just forked from parent process. A quick disposal of the - # existing sql connections avoids producing 500 errors later when they - # are discovered to be broken. - api.get_engine().pool.dispose() - self._server = self._service.pool.spawn(self._service._run, - self._application, - self._service._socket) - - def wait(self): - self._service.pool.waitall() - - def stop(self): - if isinstance(self._server, eventlet.greenthread.GreenThread): - self._server.kill() - self._server = None - - def reset(self): - pass - - class Server(object): """Server class to manage multiple WSGI sockets and applications.""" @@ -186,7 +155,6 @@ class Server(object): eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line self.pool = eventlet.GreenPool(threads) self.name = name - self._launcher = None self._server = None def _get_socket(self, host, port, backlog): @@ -268,7 +236,7 @@ class Server(object): return sock - def start(self, application, port, host='0.0.0.0', workers=0): + def start(self, application, port, host='0.0.0.0'): """Run a WSGI server with the given application.""" self._host = host self._port = port @@ -277,18 +245,10 @@ class Server(object): self._socket = self._get_socket(self._host, self._port, backlog=backlog) - if workers < 1: - # For the case where only one process is required. - self._server = self.pool.spawn(self._run, application, - self._socket) - systemd.notify_once() - else: - # Minimize the cost of checking for child exit by extending the - # wait interval past the default of 0.01s. - self._launcher = common_service.ProcessLauncher( - CONF, wait_interval=1.0, restart_method='mutate') - self._server = WorkerService(self, application) - self._launcher.launch_service(self._server, workers=workers) + # For the case where only one process is required. + self._server = self.pool.spawn(self._run, application, + self._socket) + systemd.notify_once() @property def host(self): @@ -299,19 +259,12 @@ class Server(object): return self._socket.getsockname()[1] if self._socket else self._port def stop(self): - if self._launcher: - # The process launcher does not support stop or kill. - self._launcher.running = False - else: - self._server.kill() + self._server.kill() def wait(self): """Wait until all servers have completed running.""" try: - if self._launcher: - self._launcher.wait() - else: - self.pool.waitall() + self.pool.waitall() except KeyboardInterrupt: pass