enable multiple keystone-all worker processes

Fixes bug 1157261.

Since the majority of the work keystone does is cryptographic
calculations and filtering database records, keystone is CPU-bound.
Given that a keystone-all process has only one thread (i.e.,
eventlet's thread), keystone-all's throughput is limited to the
throughput of a single CPU core. To increase keystone-all's
throughput, we need to increase its CPU parallelism, which entails
running more keystone-all processes.

This patch adds two configuration options, public_workers=N and
admin_workers=N, that determine the number of keystone-all processes
that handle requests for keystone's public and admin WSGI applications
respectively.

Note that simply running keystone-all multiple times won't work
because care has to be taken for all of the worker processes to be
using the same socket (i.e., listen() then fork()).

DocImpact

Change-Id: If74f13bc2898e880649ee809967f5b5859b793c6
Co-Authored-By: Stuart McLaren <stuart.mclaren@hp.com>
This commit is contained in:
Peter Feiner 2013-08-20 18:24:37 +00:00 committed by Stuart McLaren
parent c940835494
commit 3580c2af1b
4 changed files with 66 additions and 17 deletions

View File

@ -16,7 +16,6 @@
import logging
import os
import signal
import socket
import sys
@ -48,13 +47,31 @@ from keystone.common import sql
from keystone.common import utils
from keystone import config
from keystone.openstack.common.gettextutils import _
from keystone.openstack.common import service
from keystone.openstack.common import systemd
CONF = config.CONF
def create_server(conf, name, host, port):
class ServerWrapper(object):
"""Wraps a Server with some launching info & capabilities."""
def __init__(self, server, workers):
self.server = server
self.workers = workers
def launch_with(self, launcher):
self.server.listen()
if self.workers > 1:
# Use multi-process launcher
launcher.launch_service(self.server, self.workers)
else:
# Use single process launcher
launcher.launch_service(self.server)
def create_server(conf, name, host, port, workers):
app = deploy.loadapp('config:%s' % conf, name=name)
server = environment.Server(app, host=host, port=port,
keepalive=CONF.tcp_keepalive,
@ -62,21 +79,18 @@ def create_server(conf, name, host, port):
if CONF.ssl.enable:
server.set_ssl(CONF.ssl.certfile, CONF.ssl.keyfile,
CONF.ssl.ca_certs, CONF.ssl.cert_required)
return name, server
def sigint_handler(signal, frame):
"""Exits at SIGINT signal."""
logging.debug('SIGINT received, stopping servers.')
sys.exit(0)
return name, ServerWrapper(server, workers)
def serve(*servers):
signal.signal(signal.SIGINT, sigint_handler)
if max([server[1].workers for server in servers]) > 1:
launcher = service.ProcessLauncher()
else:
launcher = service.ServiceLauncher()
for name, server in servers:
try:
server.start()
server.launch_with(launcher)
except socket.error:
logging.exception(_('Failed to start the %(name)s server') % {
'name': name})
@ -86,7 +100,7 @@ def serve(*servers):
systemd.notify_once()
for name, server in servers:
server.wait()
launcher.wait()
if __name__ == '__main__':
@ -129,11 +143,13 @@ if __name__ == '__main__':
servers.append(create_server(paste_config,
'admin',
CONF.admin_bind_host,
int(CONF.admin_port)))
int(CONF.admin_port),
CONF.admin_workers))
servers.append(create_server(paste_config,
'main',
CONF.public_bind_host,
int(CONF.public_port)))
int(CONF.public_port),
CONF.public_workers))
dependency.resolve_future_dependencies()
serve(*servers)

View File

@ -69,6 +69,12 @@ FILE_OPTIONS = {
'to set this value if the base URL contains a path '
'(e.g. /prefix/v2.0) or the endpoint should be found '
'on a different server.'),
cfg.IntOpt('public_workers', default=1,
help='The number of worker processes to serve the public '
'WSGI application'),
cfg.IntOpt('admin_workers', default=1,
help='The number of worker processes to serve the admin '
'WSGI application'),
# default max request size is 112k
cfg.IntOpt('max_request_body_size', default=114688,
help='Enforced by optional sizelimit middleware '

View File

@ -46,9 +46,29 @@ class Server(object):
self.cert_required = False
self.keepalive = keepalive
self.keepidle = keepidle
self.socket = None
def start(self, key=None, backlog=128):
"""Run a WSGI server with the given application."""
if self.socket is None:
self.listen(key=key, backlog=backlog)
self.greenthread = self.pool.spawn(self._run,
self.application,
self.socket)
def listen(self, key=None, backlog=128):
"""Create and start listening on socket.
Call before forking worker processes.
Raises Exception if this has already been called.
"""
if self.socket is not None:
raise Exception(_('Server can only listen once.'))
LOG.info(_('Starting %(arg0)s on %(host)s:%(port)s'),
{'arg0': sys.argv[0],
'host': self.host,
@ -88,9 +108,7 @@ class Server(object):
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
self.keepidle)
self.greenthread = self.pool.spawn(self._run,
self.application,
_socket)
self.socket = _socket
def set_ssl(self, certfile, keyfile=None, ca_certs=None,
cert_required=True):
@ -104,6 +122,9 @@ class Server(object):
if self.greenthread is not None:
self.greenthread.kill()
def stop(self):
self.kill()
def wait(self):
"""Wait until all servers have completed running."""
try:
@ -119,6 +140,9 @@ class Server(object):
try:
eventlet.wsgi.server(socket, application, custom_pool=self.pool,
log=log.WritableLogger(logger), debug=False)
except greenlet.GreenletExit:
# Wait until all servers have completed running
pass
except Exception:
LOG.exception(_('Server error'))
raise

View File

@ -329,6 +329,9 @@ class TestCase(BaseTestCase):
return copy.copy(self._config_file_list)
def config_overrides(self):
# Exercise multiple worker process code paths
self.config_fixture.config(public_workers=2)
self.config_fixture.config(admin_workers=2)
self.config_fixture.config(policy_file=dirs.etc('policy.json'))
self.config_fixture.config(
group='auth',