Fix two-stage process launcher in tacker-server

tacker-server has got two-stage process launcher since a long time ago.
While it doesn't matter if {api_workers} is set to one or unset, which
is the case by default on installation via devstack, it should have
resulted in an odd situation in other cases as follows:

1). surprisingly the number of workers was not {api_workers} but
    {api_workers}*{api_workers}. Notice that the launcher processes are
    also present so the resulting number of tacker-server processes was
    1+{api_workers}*({api_workers}+1).

    E.g., when {api_workers}=100, the number of tacker-server processes
    is 10101.

2). GreenPool is per the second stage launcher. That means {api_workers}
    number of workers, each of which is a green thread on a seperate
    process (spawned by ProcessLauncher), share the same pool. It means
    that the second stage launcher does not only fail to provide
    performance scaling, but also more likely to unnecessarily keep some
    workers starved due to a process which starts sleeping before
    yielding.

This patch fixes it so that there is just a one-stage launcher.

    [Before this patch]
        master
          ├── worker#0 # second-stage launcher
          │   ├── worker#0-0 (*1)
          │   ├── worker#0-1 (*1)
          │   :
          │   └── worker#0-{api_workers-1} (*1)
          ├── worker#1 # second-stage launcher
          :
          └── worker#{api_workers-1}
              :
              └── worker#{api_workers-1}-{api_workers-1} (*1)

    [After this patch]
        master
          ├── worker#0 (*1)
          ├── worker#1 (*1)
          :
          └── worker#{api_workers-1} (*1)

    (*1) spawns one green thread at the start.

Closes-Bug: #1923528
Change-Id: I2bf567a0367659c170af0b618374a51d0f13eece
This commit is contained in:
Koichiro Den 2021-04-13 10:38:10 +09:00
parent c137b0ddfe
commit c1b2c362f0
3 changed files with 8 additions and 72 deletions

View File

@ -119,8 +119,7 @@ def _run_wsgi(app_name):
LOG.error('No known API applications configured.') LOG.error('No known API applications configured.')
return return
server = wsgi.Server("Tacker") server = wsgi.Server("Tacker")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host, server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host)
workers=cfg.CONF.api_workers)
# Dump all option values here after all options are parsed # Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info("Tacker service started, listening on %(host)s:%(port)s", LOG.info("Tacker service started, listening on %(host)s:%(port)s",

View File

@ -44,22 +44,6 @@ class TestWSGIServer(base.BaseTestCase):
server.stop() server.stop()
server.wait() server.wait()
@mock.patch('oslo_service.service.ProcessLauncher')
def test_start_multiple_workers(self, ProcessLauncher):
launcher = ProcessLauncher.return_value
server = wsgi.Server("test_multiple_processes")
server.start(None, 0, host="127.0.0.1", workers=2)
launcher.running = True
launcher.launch_service.assert_called_once_with(server._server,
workers=2)
server.stop()
self.assertFalse(launcher.running)
server.wait()
launcher.wait.assert_called_once_with()
def test_start_random_port_with_ipv6(self): def test_start_random_port_with_ipv6(self):
server = wsgi.Server("test_random_port") server = wsgi.Server("test_random_port")
server.start(None, 0, host="::1") server.start(None, 0, host="::1")

View File

@ -33,7 +33,6 @@ import tacker.conf
import oslo_i18n as i18n import oslo_i18n as i18n
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_service import service as common_service
from oslo_service import systemd from oslo_service import systemd
from oslo_utils import encodeutils from oslo_utils import encodeutils
from oslo_utils import excutils from oslo_utils import excutils
@ -44,7 +43,6 @@ import webob.exc
from tacker._i18n import _ from tacker._i18n import _
from tacker.common import exceptions as exception from tacker.common import exceptions as exception
from tacker import context from tacker import context
from tacker.db import api
socket_opts = [ socket_opts = [
@ -149,35 +147,6 @@ def expected_errors(errors):
return decorator return decorator
class WorkerService(common_service.ServiceBase):
"""Wraps a worker to be handled by ProcessLauncher."""
def __init__(self, service, application):
self._service = service
self._application = application
self._server = None
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producing 500 errors later when they
# are discovered to be broken.
api.get_engine().pool.dispose()
self._server = self._service.pool.spawn(self._service._run,
self._application,
self._service._socket)
def wait(self):
self._service.pool.waitall()
def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.kill()
self._server = None
def reset(self):
pass
class Server(object): class Server(object):
"""Server class to manage multiple WSGI sockets and applications.""" """Server class to manage multiple WSGI sockets and applications."""
@ -186,7 +155,6 @@ class Server(object):
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.pool = eventlet.GreenPool(threads) self.pool = eventlet.GreenPool(threads)
self.name = name self.name = name
self._launcher = None
self._server = None self._server = None
def _get_socket(self, host, port, backlog): def _get_socket(self, host, port, backlog):
@ -268,7 +236,7 @@ class Server(object):
return sock return sock
def start(self, application, port, host='0.0.0.0', workers=0): def start(self, application, port, host='0.0.0.0'):
"""Run a WSGI server with the given application.""" """Run a WSGI server with the given application."""
self._host = host self._host = host
self._port = port self._port = port
@ -277,18 +245,10 @@ class Server(object):
self._socket = self._get_socket(self._host, self._socket = self._get_socket(self._host,
self._port, self._port,
backlog=backlog) backlog=backlog)
if workers < 1: # For the case where only one process is required.
# For the case where only one process is required. self._server = self.pool.spawn(self._run, application,
self._server = self.pool.spawn(self._run, application, self._socket)
self._socket) systemd.notify_once()
systemd.notify_once()
else:
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
self._launcher = common_service.ProcessLauncher(
CONF, wait_interval=1.0, restart_method='mutate')
self._server = WorkerService(self, application)
self._launcher.launch_service(self._server, workers=workers)
@property @property
def host(self): def host(self):
@ -299,19 +259,12 @@ class Server(object):
return self._socket.getsockname()[1] if self._socket else self._port return self._socket.getsockname()[1] if self._socket else self._port
def stop(self): def stop(self):
if self._launcher: self._server.kill()
# The process launcher does not support stop or kill.
self._launcher.running = False
else:
self._server.kill()
def wait(self): def wait(self):
"""Wait until all servers have completed running.""" """Wait until all servers have completed running."""
try: try:
if self._launcher: self.pool.waitall()
self._launcher.wait()
else:
self.pool.waitall()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass