Creates multiple worker processes for API server

This change to the WSGI code uses openstack.common.service to create
multiple worker processes to handle API load.  The main process will
start up a configurable (workers=??) number of child processes which
will all listen on the bind port.  The main process becomes the parent
and manages the children.  The parent is not a worker.

Backwards compatibility is preserved by setting api_workers to 0, the
default.  In this case, no separate worker processes are spawned and
the worker threads run in the main process.

Implement blueprint multi-workers-for-api-server

Change-Id: Iffa76041d0055840ccca852814b0e71f17a950ac
This commit is contained in:
Carl Baldwin 2013-07-12 20:45:38 +00:00
parent 3468a03f42
commit 3d669cbd0e
4 changed files with 72 additions and 5 deletions

View File

@ -239,6 +239,10 @@ notification_driver = neutron.openstack.common.notifier.rpc_notifier
# =========== end of items for agent scheduler extension =====
# =========== WSGI parameters related to the API server ==============
# Number of separate worker processes to spawn. The default, 0, runs the
# worker thread in the current process. Greater than 0 launches that number of
# child processes as workers. The parent process manages them.
# api_workers = 0
# Sets the value of TCP_KEEPIDLE in seconds to use for each server socket when
# starting API server. Not supported on OS X.
# tcp_keepidle = 600

View File

@ -36,6 +36,9 @@ service_opts = [
cfg.IntOpt('periodic_interval',
default=40,
help=_('Seconds between running periodic tasks')),
cfg.IntOpt('api_workers',
default=0,
help=_('Number of separate worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help=_('Range of seconds to randomly delay when starting the '
@ -111,7 +114,8 @@ def _run_wsgi(app_name):
LOG.error(_('No known API applications configured.'))
return
server = wsgi.Server("Neutron")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host)
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=cfg.CONF.api_workers)
# Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info(_("Neutron service started, listening on %(host)s:%(port)s"),

View File

@ -47,6 +47,22 @@ class TestWSGIServer(base.BaseTestCase):
server.stop()
server.wait()
@mock.patch('neutron.wsgi.ProcessLauncher')
def test_start_multiple_workers(self, ProcessLauncher):
launcher = ProcessLauncher.return_value
server = wsgi.Server("test_multiple_processes")
server.start(None, 0, host="127.0.0.1", workers=2)
launcher.running = True
launcher.launch_service.assert_called_once_with(server._server,
workers=2)
server.stop()
self.assertFalse(launcher.running)
server.wait()
launcher.wait.assert_called_once_with()
def test_start_random_port_with_ipv6(self):
server = wsgi.Server("test_random_port")
server.start(None, 0, host="::1")

View File

@ -37,9 +37,11 @@ import webob.exc
from neutron.common import constants
from neutron.common import exceptions as exception
from neutron import context
from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common import gettextutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.openstack.common.service import ProcessLauncher
socket_opts = [
cfg.IntOpt('backlog',
@ -82,12 +84,39 @@ def run_server(application, port):
eventlet.wsgi.server(sock, application)
class WorkerService(object):
"""Wraps a worker to be handled by ProcessLauncher"""
def __init__(self, service, application):
self._service = service
self._application = application
self._server = None
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producting 500 errors later when they
# are discovered to be broken.
session.get_engine(sqlite_fk=True).pool.dispose()
self._server = self._service.pool.spawn(self._service._run,
self._application,
self._service._socket)
def wait(self):
self._service.pool.waitall()
def stop(self):
if isinstance(self._server, eventlet.greenthread.GreenThread):
self._server.kill()
self._server = None
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, name, threads=1000):
self.pool = eventlet.GreenPool(threads)
self.name = name
self._launcher = None
self._server = None
def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
@ -166,7 +195,7 @@ class Server(object):
return sock
def start(self, application, port, host='0.0.0.0'):
def start(self, application, port, host='0.0.0.0', workers=0):
"""Run a WSGI server with the given application."""
self._host = host
self._port = port
@ -175,7 +204,14 @@ class Server(object):
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._server = self.pool.spawn(self._run, application, self._socket)
if workers < 1:
# For the case where only one process is required.
self._server = self.pool.spawn(self._run, application,
self._socket)
else:
self._launcher = ProcessLauncher()
self._server = WorkerService(self, application)
self._launcher.launch_service(self._server, workers=workers)
@property
def host(self):
@ -186,12 +222,19 @@ class Server(object):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
self._server.kill()
if self._launcher:
# The process launcher does not support stop or kill.
self._launcher.running = False
else:
self._server.kill()
def wait(self):
"""Wait until all servers have completed running."""
try:
self.pool.waitall()
if self._launcher:
self._launcher.wait()
else:
self.pool.waitall()
except KeyboardInterrupt:
pass