Multi-process Glance API server support.
Implements blueprint multi-process-server. Allows several Glance API worker processes to be started, which can increase performance on machines with more than one CPU. Change-Id: I1cbb48945fd23afd71de3a30b80836b590c023a1
This commit is contained in:
parent
1698eb10b5
commit
e893b248a2
@ -123,6 +123,17 @@ Number of backlog requests to configure the socket with.
|
||||
|
||||
Optional. Default: ``4096``
|
||||
|
||||
* ``workers=PROCESSES``
|
||||
|
||||
Number of Glance API worker processes to start. Each worker
|
||||
process will listen on the same port. Increasing this
|
||||
value may increase performance (especially if using SSL
|
||||
with compression enabled). Typically it is recommended
|
||||
to have one worker process per CPU. The value `0` will
|
||||
prevent any new processes from being created.
|
||||
|
||||
Optional. Default: ``0``
|
||||
|
||||
Configurating SSL Support
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
@ -23,6 +23,13 @@ log_file = /var/log/glance/api.log
|
||||
# Backlog requests when creating socket
|
||||
backlog = 4096
|
||||
|
||||
# Number of Glance API worker processes to start.
|
||||
# On machines with more than one CPU increasing this value
|
||||
# may improve performance (especially if using SSL with
|
||||
# compression turned on). It is typically recommended to set
|
||||
# this value to the number of CPUs present on your machine.
|
||||
workers = 0
|
||||
|
||||
# ================= Syslog Options ============================
|
||||
|
||||
# Send logs to syslog (/dev/log) instead of to file specified
|
||||
|
@ -1076,7 +1076,8 @@ class ConfigOpts(object):
|
||||
|
||||
class CommonConfigOpts(ConfigOpts):
|
||||
|
||||
DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
|
||||
DEFAULT_LOG_FORMAT = ('%(asctime)s %(process)d %(levelname)8s '
|
||||
'[%(name)s] %(message)s')
|
||||
DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
common_cli_opts = [
|
||||
|
@ -25,10 +25,13 @@ import datetime
|
||||
import errno
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
import eventlet.greenio
|
||||
from eventlet.green import socket, ssl
|
||||
import eventlet.wsgi
|
||||
from paste import deploy
|
||||
@ -51,7 +54,9 @@ socket_opts = [
|
||||
cfg.IntOpt('backlog', default=4096),
|
||||
cfg.StrOpt('cert_file'),
|
||||
cfg.StrOpt('key_file'),
|
||||
]
|
||||
]
|
||||
|
||||
workers_opt = cfg.IntOpt('workers', default=0)
|
||||
|
||||
|
||||
class WritableLogger(object):
|
||||
@ -133,7 +138,9 @@ class Server(object):
|
||||
"""Server class to manage multiple WSGI sockets and applications."""
|
||||
|
||||
def __init__(self, threads=1000):
|
||||
self.pool = eventlet.GreenPool(threads)
|
||||
self.threads = threads
|
||||
self.children = []
|
||||
self.running = True
|
||||
|
||||
def start(self, application, conf, default_port):
|
||||
"""
|
||||
@ -143,21 +150,99 @@ class Server(object):
|
||||
:param conf: a cfg.ConfigOpts object
|
||||
:param default_port: Port to bind to if none is specified in conf
|
||||
"""
|
||||
socket = get_socket(conf, default_port)
|
||||
self.pool.spawn_n(self._run, application, socket)
|
||||
def kill_children(*args):
|
||||
"""Kills the entire process group."""
|
||||
self.logger.error(_('SIGTERM received'))
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
self.running = False
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
|
||||
def hup(*args):
|
||||
"""
|
||||
Shuts down the server, but allows running requests to complete
|
||||
"""
|
||||
self.logger.error(_('SIGHUP received'))
|
||||
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
||||
self.running = False
|
||||
|
||||
self.application = application
|
||||
self.sock = get_socket(conf, default_port)
|
||||
conf.register_opt(workers_opt)
|
||||
|
||||
self.logger = logging.getLogger('eventlet.wsgi.server')
|
||||
|
||||
if conf.workers == 0:
|
||||
# Useful for profiling, test, debug etc.
|
||||
self.pool = eventlet.GreenPool(size=self.threads)
|
||||
self.pool.spawn_n(self._single_run, application, self.sock)
|
||||
return
|
||||
|
||||
self.logger.info(_("Starting %d workers") % conf.workers)
|
||||
signal.signal(signal.SIGTERM, kill_children)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
while len(self.children) < conf.workers:
|
||||
self.run_child()
|
||||
|
||||
def wait_on_children(self):
|
||||
while self.running:
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
self.logger.error(_('Removing dead child %s') % pid)
|
||||
self.children.remove(pid)
|
||||
self.run_child()
|
||||
except OSError, err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(1)
|
||||
self.logger.info(_('Caught keyboard interrupt. Exiting.'))
|
||||
break
|
||||
eventlet.greenio.shutdown_safe(self.sock)
|
||||
self.sock.close()
|
||||
self.logger.debug(_('Exited'))
|
||||
|
||||
def wait(self):
|
||||
"""Wait until all servers have completed running."""
|
||||
try:
|
||||
self.pool.waitall()
|
||||
if self.children:
|
||||
self.wait_on_children()
|
||||
else:
|
||||
self.pool.waitall()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def _run(self, application, socket):
|
||||
def run_child(self):
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
self.run_server()
|
||||
self.logger.info(_('Child %d exiting normally') % os.getpid())
|
||||
return
|
||||
else:
|
||||
self.logger.info(_('Started child %s') % pid)
|
||||
self.children.append(pid)
|
||||
|
||||
def run_server(self):
|
||||
"""Run a WSGI server."""
|
||||
eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
|
||||
eventlet.hubs.use_hub('poll')
|
||||
eventlet.patcher.monkey_patch(all=False, socket=True)
|
||||
self.pool = eventlet.GreenPool(size=self.threads)
|
||||
try:
|
||||
eventlet.wsgi.server(self.sock, self.application,
|
||||
log=WritableLogger(self.logger), custom_pool=self.pool)
|
||||
except socket.error, err:
|
||||
if err[0] != errno.EINVAL:
|
||||
raise
|
||||
self.pool.waitall()
|
||||
|
||||
def _single_run(self, application, sock):
|
||||
"""Start a WSGI server in a new green thread."""
|
||||
logger = logging.getLogger('eventlet.wsgi.server')
|
||||
eventlet.wsgi.server(socket, application, custom_pool=self.pool,
|
||||
log=WritableLogger(logger))
|
||||
self.logger.info(_("Starting single process server"))
|
||||
eventlet.wsgi.server(sock, application, custom_pool=self.pool,
|
||||
log=WritableLogger(self.logger))
|
||||
|
||||
|
||||
class Middleware(object):
|
||||
|
@ -191,6 +191,7 @@ class ApiServer(Server):
|
||||
self.rbd_store_chunk_size = 4
|
||||
self.delayed_delete = delayed_delete
|
||||
self.owner_is_tenant = True
|
||||
self.workers = 0
|
||||
self.image_cache_dir = os.path.join(self.test_dir,
|
||||
'cache')
|
||||
self.image_cache_driver = 'sqlite'
|
||||
@ -223,6 +224,7 @@ rbd_store_pool = %(rbd_store_pool)s
|
||||
rbd_store_ceph_conf = %(rbd_store_ceph_conf)s
|
||||
delayed_delete = %(delayed_delete)s
|
||||
owner_is_tenant = %(owner_is_tenant)s
|
||||
workers = %(workers)s
|
||||
scrub_time = 5
|
||||
scrubber_datadir = %(scrubber_datadir)s
|
||||
image_cache_dir = %(image_cache_dir)s
|
||||
|
@ -20,7 +20,6 @@
|
||||
import datetime
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from glance.common import utils
|
||||
from glance.tests import functional
|
||||
|
40
glance/tests/functional/test_multiprocessing.py
Normal file
40
glance/tests/functional/test_multiprocessing.py
Normal file
@ -0,0 +1,40 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2012 OpenStack, LLC
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import httplib2
|
||||
|
||||
from glance.tests import functional
|
||||
|
||||
|
||||
class TestMultiprocessing(functional.FunctionalTest):
|
||||
"""Functional tests for the bin/glance CLI tool"""
|
||||
|
||||
def setUp(self):
|
||||
self.workers = 2
|
||||
super(TestMultiprocessing, self).setUp()
|
||||
|
||||
def test_multiprocessing(self):
|
||||
"""Spin up the api servers with multiprocessing on"""
|
||||
self.cleanup()
|
||||
self.start_servers(**self.__dict__.copy())
|
||||
|
||||
path = "http://%s:%d/v1/images" % ("0.0.0.0", self.api_port)
|
||||
http = httplib2.Http()
|
||||
response, content = http.request(path, 'GET')
|
||||
self.assertEqual(response.status, 200)
|
||||
self.assertEqual(content, '{"images": []}')
|
||||
self.stop_servers()
|
Loading…
Reference in New Issue
Block a user