Enable multi-process for API service

Due to the limit of Python interpreter, API service of Cinder can't
really utilize underlying multi-core architecture even libraries
like eventlet has been used. To make API service much more scalable,
we'd adopt multi-process (worker) mode that has been used for long
in Glance/Swift/Nova.

The default behavior isn't changed with this patch, Cinder API
service will still run in one process (default value of
osapi_volume_workers is None).

Implementation wise, a good portion of cinder/service.py has been
removed because those content has been merged in Oslo version of
service module.  cinder/wsgi.py is also updated to adopt the change
for multiple WSGI servers running in separate processes.

Implement bp: multi-process-api-service

DocImpact: 'New config option osapi_volume_workers is used to specify
number of API service workers (OS processes) to launch for Cinder
API service.  Setting this config option to a proper value (e.g.
osapi_volume_workers = # of CPU cores/threads of the machine) can
greatly improve the total throughput of API service [# of API
requests can be handled per second].'

Also removed out-dated comments in bin/cinder-api due to the fact
that this bug [1] has been fixed in eventlet 0.9.13

[1] https://bitbucket.org/eventlet/eventlet/issue/92/eventletgreen-override-of-oswaitpid

Change-Id: I8361d0dc0d43040e48634ff1aee1324e5e0af466
This commit is contained in:
Zhiteng Huang 2014-01-08 14:50:29 +08:00
parent d540eb6f7c
commit 4576ca73ea
8 changed files with 59 additions and 321 deletions

View File

@ -62,17 +62,17 @@ if __name__ == '__main__':
LOG = logging.getLogger('cinder.all')
utils.monkey_patch()
servers = []
launcher = service.process_launcher()
# cinder-api
try:
servers.append(service.WSGIService('osapi_volume'))
server = service.WSGIService('osapi_volume')
launcher.launch_service(server, workers=server.workers or 1)
except (Exception, SystemExit):
LOG.exception(_('Failed to load osapi_volume'))
for binary in ['cinder-volume', 'cinder-scheduler', 'cinder-backup']:
try:
servers.append(service.Service.create(binary=binary))
launcher.launch_service(service.Service.create(binary=binary))
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s'), binary)
service.serve(*servers)
service.wait()
launcher.wait()

View File

@ -17,10 +17,6 @@
"""Starter script for Cinder OS API."""
# NOTE(jdg): If we port over multi worker code from Nova
# we'll need to set monkey_patch(os=False), unless
# eventlet is updated/released to fix the root issue
import eventlet
eventlet.monkey_patch()
@ -54,6 +50,8 @@ if __name__ == '__main__':
version=version.version_string())
logging.setup("cinder")
utils.monkey_patch()
launcher = service.process_launcher()
server = service.WSGIService('osapi_volume')
service.serve(server)
service.wait()
launcher.launch_service(server, workers=server.workers or 1)
launcher.wait()

View File

@ -54,7 +54,6 @@ if __name__ == '__main__':
version=version.version_string())
logging.setup("cinder")
utils.monkey_patch()
launcher = service.ProcessLauncher()
server = service.Service.create(binary='cinder-backup')
launcher.launch_server(server)
launcher.wait()
service.serve(server)
service.wait()

View File

@ -58,18 +58,17 @@ if __name__ == '__main__':
version=version.version_string())
logging.setup("cinder")
utils.monkey_patch()
if os.name == 'nt':
launcher = service
launcher.launch_server = service.serve
else:
launcher = service.ProcessLauncher()
# Note(zhiteng): Since Windows (os='nt') has already ignored monkey
# patching 'os' module, there is no need to treat it differently
# when creating launcher.
launcher = service.process_launcher()
if CONF.enabled_backends:
for backend in CONF.enabled_backends:
host = "%s@%s" % (CONF.host, backend)
server = service.Service.create(host=host,
service_name=backend)
launcher.launch_server(server)
launcher.launch_service(server)
else:
server = service.Service.create(binary='cinder-volume')
launcher.launch_server(server)
launcher.launch_service(server)
launcher.wait()

View File

@ -18,16 +18,10 @@
"""Generic Node base class for all workers that run on hosts."""
import errno
import inspect
import os
import random
import signal
import sys
import time
import eventlet
import greenlet
from oslo.config import cfg
from cinder import context
@ -37,6 +31,7 @@ from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import loopingcall
from cinder.openstack.common import rpc
from cinder.openstack.common import service
from cinder import version
from cinder import wsgi
@ -60,274 +55,15 @@ service_opts = [
help='IP address for OpenStack Volume API to listen'),
cfg.IntOpt('osapi_volume_listen_port',
default=8776,
help='port for os volume api to listen'), ]
help='port for os volume api to listen'),
cfg.IntOpt('osapi_volume_workers',
help='Number of workers for OpenStack Volume API service'), ]
CONF = cfg.CONF
CONF.register_opts(service_opts)
class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
super(SignalExit, self).__init__(exccode)
self.signo = signo
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
def __init__(self):
"""Initialize the service launcher.
:returns: None
"""
self._services = []
@staticmethod
def run_server(server):
"""Start and wait for a server to finish.
:param server: Server to run and wait for.
:returns: None
"""
server.start()
server.wait()
def launch_server(self, server):
"""Load and start the given server.
:param server: The server you would like to start.
:returns: None
"""
gt = eventlet.spawn(self.run_server, server)
self._services.append(gt)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
for service in self._services:
service.kill()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
def sigterm(sig, frame):
LOG.audit(_("SIGTERM received"))
# NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
# shuts down the service. This does not yet handle eventlet
# threads.
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, sigterm)
for service in self._services:
try:
service.wait()
except greenlet.GreenletExit:
pass
class ServerWrapper(object):
def __init__(self, server, workers):
self.server = server
self.workers = workers
self.children = set()
self.forktimes = []
self.failed = False
class ProcessLauncher(object):
def __init__(self):
self.children = {}
self.sigcaught = None
self.totalwrap = 0
self.failedwrap = 0
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read()
LOG.info(_('Parent process has died unexpectedly, exiting'))
sys.exit(1)
def _child_process(self, server):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
signal.signal(signal.SIGTERM, _sigterm)
# Block SIGINT and let the parent send us a SIGTERM
# signal.signal(signal.SIGINT, signal.SIG_IGN)
# This differs from the behavior in nova in that we don't ignore this
# It allows the non-wsgi services to be terminated properly
signal.signal(signal.SIGINT, _sigterm)
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.run_server(server)
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_('Forking too fast, sleeping'))
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.server)
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
finally:
wrap.server.stop()
os._exit(status)
LOG.info(_('Started child %d'), pid)
wrap.children.add(pid)
self.children[pid] = wrap
return pid
def launch_server(self, server, workers=1):
wrap = ServerWrapper(server, workers)
self.totalwrap = self.totalwrap + 1
LOG.info(_('Starting %d workers'), wrap.workers)
while (self.running and len(wrap.children) < wrap.workers
and not wrap.failed):
self._start_child(wrap)
def _wait_child(self):
try:
# Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
if not pid:
return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
return None
code = 0
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
{'pid': pid, 'sig': sig})
else:
code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)d exited with status %(code)d'),
{'pid': pid, 'code': code})
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
return None
wrap = self.children.pop(pid)
wrap.children.remove(pid)
if 2 == code:
wrap.failed = True
self.failedwrap = self.failedwrap + 1
LOG.info(_('_wait_child %d'), self.failedwrap)
if self.failedwrap == self.totalwrap:
self.running = False
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
while self.running:
wrap = self._wait_child()
if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
LOG.info(_('wait wrap.failed %s'), wrap.failed)
while (self.running and len(wrap.children) < wrap.workers
and not wrap.failed):
self._start_child(wrap)
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[self.sigcaught]
LOG.info(_('Caught %s, stopping children'), signame)
for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
except OSError as exc:
if exc.errno != errno.ESRCH:
raise
# Wait for children to die
if self.children:
LOG.info(_('Waiting on %d children to exit'), len(self.children))
while self.children:
wrap = self._wait_child()
if not wrap:
eventlet.greenthread.sleep(.01)
continue
class Service(object):
class Service(service.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
@ -338,6 +74,7 @@ class Service(object):
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, *args, **kwargs):
super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
@ -349,7 +86,6 @@ class Service(object):
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
@ -476,6 +212,8 @@ class Service(object):
pass
self.timers = []
super(Service, self).stop()
def wait(self):
for x in self.timers:
try:
@ -538,6 +276,13 @@ class WSGIService(object):
self.app = self.loader.load_app(name)
self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0")
self.port = getattr(CONF, '%s_listen_port' % name, 0)
self.workers = getattr(CONF, '%s_workers' % name, None)
if self.workers < 1:
LOG.warn(_("Value of config option %(name)s_workers must be "
"integer greater than 1. Input value ignored.") %
{'name': name})
# Reset workers to default
self.workers = None
self.basic_config_check()
self.server = wsgi.Server(name,
self.app,
@ -612,18 +357,22 @@ class WSGIService(object):
self.server.wait()
def process_launcher():
return service.ProcessLauncher()
# NOTE(vish): the global launcher is to maintain the existing
# functionality of calling service.serve +
# service.wait
_launcher = None
def serve(*servers):
def serve(server, workers=None):
global _launcher
if not _launcher:
_launcher = Launcher()
for server in servers:
_launcher.launch_server(server)
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = service.launch(server, workers=workers)
def wait():

View File

@ -214,18 +214,3 @@ class TestWSGIService(test.TestCase):
CONF.set_override('report_interval', 10)
service.WSGIService("test_service")
self.assertEqual(CONF.service_down_time, 25)
class TestLauncher(test.TestCase):
def setUp(self):
super(TestLauncher, self).setUp()
self.stubs.Set(wsgi.Loader, "load_app", mox.MockAnything())
self.service = service.WSGIService("test_service")
def test_launch_app(self):
self.assertEqual(0, self.service.port)
launcher = service.Launcher()
launcher.launch_server(self.service)
self.assertEqual(0, self.service.port)
launcher.stop()

View File

@ -77,7 +77,7 @@ class Server(object):
default_pool_size = 1000
def __init__(self, name, app, host=None, port=None, pool_size=None,
protocol=eventlet.wsgi.HttpProtocol):
protocol=eventlet.wsgi.HttpProtocol, backlog=128):
"""Initialize, but do not start, a WSGI server.
:param name: Pretty name for logging.
@ -99,6 +99,13 @@ class Server(object):
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = logging.WritableLogger(self._logger)
if backlog < 1:
raise exception.InvalidInput(
reason='The backlog must be more than 1')
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
# TODO(dims): eventlet's green dns/socket module does not actually
@ -197,13 +204,6 @@ class Server(object):
:raises: cinder.exception.InvalidInput
"""
if backlog < 1:
raise exception.InvalidInput(
reason='The backlog must be more than 1')
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._server = eventlet.spawn(self._start)
(self._host, self._port) = self._socket.getsockname()[0:2]
LOG.info(_("Started %(name)s on %(host)s:%(port)s") %
@ -227,7 +227,10 @@ class Server(object):
"""
LOG.info(_("Stopping WSGI server."))
self._server.kill()
if self._server is not None:
# Resize pool to stop new requests from being processed
self._pool.resize(0)
self._server.kill()
def wait(self):
"""Block, until the server has stopped.
@ -238,7 +241,8 @@ class Server(object):
"""
try:
self._server.wait()
if self._server is not None:
self._server.wait()
except greenlet.GreenletExit:
LOG.info(_("WSGI server has stopped."))

View File

@ -76,6 +76,10 @@
# port for os volume api to listen (integer value)
#osapi_volume_listen_port=8776
# Number of workers for OpenStack Volume API service (integer
# value)
#osapi_volume_workers=<None>
#
# Options defined in cinder.test