blueprint <multi-process-api-service>
Add multiprocess support for API serivces (EC2/OSAPI_Compute/OSAPI_Volume/Metadata).
2012-06-1 v7:
* Add unittest to cover worker recovery, service termination functionality
in wsgi.py, fix python 2.6 compatibility issue.
* Modify generate_uid() to introduce per-process seeds in utils.py to avoid
collisions.
* Add worker session to nova.conf.sample.
2012-05-21 v6:
* Fix 'test_wsgi' unittest error.
2012-04-28 v5:
* Add SIGINT handler and fix child-parent race condition when Ctrl+C is
pressed.
2012-03-31 v4:
* Fixed typo, removed debug code.
2012-03-30 v3:
* Fixed localization/pep8 error in unittest, add metadata test.
* nova/wsgi.py:Server: use the greenthread pool created for each process.
* nova/service.py: remove debug code
2012-03-27 v2:
* Fixed unittest error.
* nova/wsgi.py:Server: Use self._logger to do logging in multiprocess mode.
* nova/wsgi.py:Server: Move self._pool creation into proper place.
* code style fix.
2012-03-25 v1:
* Modification to nova/service.py and nova/wsgi.py in order to support
multiprocess (a.k.a. workers) for various API services. If multiprocess
mode is enabled, (i.e. flags 'APINAME_workers' set to positive numbers),
corresponding API service will run in target number of process(es). There
is also a master_worker process spawned for managing all workers (handling
signal/termination).
* Add unittest for multiprocess API service, also alter testing/runner.py
to adopt new unittest.
Change-Id: Ia045e595543ddfd192894b2a05801cc4b7ca90cb
This commit is contained in:
1
Authors
1
Authors
@@ -213,6 +213,7 @@ Yun Mao <yunmao@gmail.com>
|
||||
Yun Shen <Yun.Shen@hp.com>
|
||||
Yuriy Taraday <yorik.sar@gmail.com>
|
||||
Zed Shaw <zedshaw@zedshaw.com>
|
||||
Zhiteng Huang <zhiteng.huang@intel.com>
|
||||
Zhixue Wu <Zhixue.Wu@citrix.com>
|
||||
Zhongyue Luo <lzyeval@gmail.com>
|
||||
Ziad Sawalha <github@highbridgellc.com>
|
||||
|
||||
@@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads.
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova EC2 API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova Metadata API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova OS API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"""Starter script for Nova OS API."""
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -61,12 +61,18 @@ service_opts = [
|
||||
cfg.IntOpt('ec2_listen_port',
|
||||
default=8773,
|
||||
help='port for ec2 api to listen'),
|
||||
cfg.IntOpt('ec2_workers',
|
||||
default=0,
|
||||
help='Number of workers for EC2 API service'),
|
||||
cfg.StrOpt('osapi_compute_listen',
|
||||
default="0.0.0.0",
|
||||
help='IP address for OpenStack API to listen'),
|
||||
cfg.IntOpt('osapi_compute_listen_port',
|
||||
default=8774,
|
||||
help='list port for osapi compute'),
|
||||
cfg.IntOpt('osapi_compute_workers',
|
||||
default=0,
|
||||
help='Number of workers for OpenStack API service'),
|
||||
cfg.StrOpt('metadata_manager',
|
||||
default='nova.api.manager.MetadataManager',
|
||||
help='OpenStack metadata service manager'),
|
||||
@@ -76,12 +82,18 @@ service_opts = [
|
||||
cfg.IntOpt('metadata_listen_port',
|
||||
default=8775,
|
||||
help='port for metadata api to listen'),
|
||||
cfg.IntOpt('metadata_workers',
|
||||
default=0,
|
||||
help='Number of workers for metadata service'),
|
||||
cfg.StrOpt('osapi_volume_listen',
|
||||
default="0.0.0.0",
|
||||
help='IP address for OpenStack Volume API to listen'),
|
||||
cfg.IntOpt('osapi_volume_listen_port',
|
||||
default=8776,
|
||||
help='port for os volume api to listen')
|
||||
help='port for os volume api to listen'),
|
||||
cfg.IntOpt('osapi_volume_workers',
|
||||
default=0,
|
||||
help='Number of workers for OpenStack Volume API service')
|
||||
]
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
@@ -135,14 +147,6 @@ class Launcher(object):
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
def sigterm(sig, frame):
|
||||
LOG.audit(_("SIGTERM received"))
|
||||
# NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
|
||||
# shuts down the service. This does not yet handle eventlet
|
||||
# threads.
|
||||
raise KeyboardInterrupt
|
||||
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
|
||||
for service in self._services:
|
||||
try:
|
||||
@@ -362,10 +366,12 @@ class WSGIService(object):
|
||||
self.app = self.loader.load_app(name)
|
||||
self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0")
|
||||
self.port = getattr(FLAGS, '%s_listen_port' % name, 0)
|
||||
self.workers = getattr(FLAGS, '%s_workers' % name, 0)
|
||||
self.server = wsgi.Server(name,
|
||||
self.app,
|
||||
host=self.host,
|
||||
port=self.port)
|
||||
port=self.port,
|
||||
workers=self.workers)
|
||||
|
||||
def _get_manager(self):
|
||||
"""Initialize a Manager object appropriate for this service.
|
||||
|
||||
@@ -366,5 +366,5 @@ def run():
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
eventlet.monkey_patch()
|
||||
eventlet.monkey_patch(os=False)
|
||||
run()
|
||||
|
||||
@@ -63,6 +63,7 @@ LOG = logging.getLogger(__name__)
|
||||
ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
|
||||
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
|
||||
FLAGS = flags.FLAGS
|
||||
RESEED = True
|
||||
|
||||
FLAGS.register_opt(
|
||||
cfg.BoolOpt('disable_process_locking', default=False,
|
||||
@@ -290,6 +291,13 @@ def debug(arg):
|
||||
|
||||
|
||||
def generate_uid(topic, size=8):
|
||||
global RESEED
|
||||
if RESEED:
|
||||
random.seed("%d%s%s" % (os.getpid(),
|
||||
socket.gethostname(),
|
||||
time.time()))
|
||||
RESEED = False
|
||||
|
||||
characters = '01234567890abcdefghijklmnopqrstuvwxyz'
|
||||
choices = [random.choice(characters) for _x in xrange(size)]
|
||||
return '%s-%s' % (topic, ''.join(choices))
|
||||
|
||||
166
nova/wsgi.py
166
nova/wsgi.py
@@ -19,12 +19,16 @@
|
||||
|
||||
"""Utility methods for working with WSGI servers."""
|
||||
|
||||
import os.path
|
||||
import errno
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import eventlet
|
||||
from eventlet.green import socket
|
||||
import eventlet.wsgi
|
||||
import greenlet
|
||||
import multiprocessing
|
||||
from paste import deploy
|
||||
import routes.middleware
|
||||
import webob.dec
|
||||
@@ -45,14 +49,15 @@ class Server(object):
|
||||
|
||||
default_pool_size = 1000
|
||||
|
||||
def __init__(self, name, app, host=None, port=None, pool_size=None,
|
||||
protocol=eventlet.wsgi.HttpProtocol):
|
||||
def __init__(self, name, app, host=None, port=None, workers=None,
|
||||
pool_size=None, protocol=eventlet.wsgi.HttpProtocol):
|
||||
"""Initialize, but do not start, a WSGI server.
|
||||
|
||||
:param name: Pretty name for logging.
|
||||
:param app: The WSGI application to serve.
|
||||
:param host: IP address to serve the application.
|
||||
:param port: Port number to server the application.
|
||||
:param workers: Number of process to spawn concurrently
|
||||
:param pool_size: Maximum number of eventlets to spawn concurrently.
|
||||
:returns: None
|
||||
|
||||
@@ -61,12 +66,17 @@ class Server(object):
|
||||
self.app = app
|
||||
self.host = host or "0.0.0.0"
|
||||
self.port = port or 0
|
||||
self.workers = workers or 0
|
||||
self._server = None
|
||||
self._socket = None
|
||||
self._protocol = protocol
|
||||
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
|
||||
self._logger = logging.getLogger("eventlet.wsgi.server")
|
||||
self._pool_size = pool_size or self.default_pool_size
|
||||
self._pool = None
|
||||
self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name)
|
||||
self._wsgi_logger = logging.WritableLogger(self._logger)
|
||||
self.master_worker = None
|
||||
self.children = []
|
||||
self.running = True
|
||||
|
||||
def _start(self):
|
||||
"""Run the blocking eventlet WSGI server.
|
||||
@@ -90,11 +100,124 @@ class Server(object):
|
||||
"""
|
||||
if backlog < 1:
|
||||
raise exception.InvalidInput(
|
||||
reason='The backlog must be more than 1')
|
||||
self._socket = eventlet.listen((self.host, self.port), backlog=backlog)
|
||||
self._server = eventlet.spawn(self._start)
|
||||
(self.host, self.port) = self._socket.getsockname()
|
||||
LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__)
|
||||
reason='The backlog must be more than 1')
|
||||
|
||||
try:
|
||||
self._socket = eventlet.listen((self.host, self.port),
|
||||
backlog=backlog)
|
||||
(self.host, self.port) = self._socket.getsockname()
|
||||
except socket.error, err:
|
||||
if err[0] != errno.EINVAL:
|
||||
raise
|
||||
|
||||
if self.workers == 0:
|
||||
# single process mode, useful for profiling, test, debug etc.
|
||||
self._pool = eventlet.GreenPool(self._pool_size)
|
||||
self._server = self._pool.spawn(self._start)
|
||||
LOG.info(_("Started %(name)s on %(host)s:%(port)s") %
|
||||
self.__dict__)
|
||||
return None
|
||||
|
||||
# master_worker doesn't actually do work (i.e. handle API request)
|
||||
# but it's a managing process to handle signal/termination for
|
||||
# this type of API service, only needed if workers > 1
|
||||
self.master_worker = multiprocessing.Process(target=self.run_workers,
|
||||
args=())
|
||||
self.master_worker.start()
|
||||
self._logger.info(_("Started %(name)s on %(host)s:%(port)s in process")
|
||||
% self.__dict__)
|
||||
return None
|
||||
|
||||
def run_server_in_process(self):
|
||||
"""Run a WSGI server."""
|
||||
eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
|
||||
eventlet.hubs.use_hub('poll')
|
||||
eventlet.patcher.monkey_patch(all=False, socket=True)
|
||||
|
||||
self._pool = eventlet.GreenPool(size=self._pool_size)
|
||||
try:
|
||||
self._pool.spawn_n(self._start)
|
||||
except socket.error, err:
|
||||
if err[0] != errno.EINVAL:
|
||||
raise
|
||||
|
||||
self._pool.waitall()
|
||||
|
||||
def run_workers(self):
|
||||
"""Start workers and wait for them to join"""
|
||||
def kill_children(*args):
|
||||
"""Kills the entire process group."""
|
||||
#TODO(zhiteng) Gracefully kill all eventlet greenthread
|
||||
self._logger.error(_('SIGTERM or SIGINT received'))
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
self.running = False
|
||||
for pid in self.children:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
|
||||
def hup(*args):
|
||||
"""
|
||||
Shuts down the server, but allows running requests to complete
|
||||
"""
|
||||
self._logger.error(_('SIGHUP received'))
|
||||
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
||||
self.running = False
|
||||
|
||||
signal.signal(signal.SIGTERM, kill_children)
|
||||
signal.signal(signal.SIGINT, kill_children)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
|
||||
while len(self.children) < self.workers:
|
||||
self.run_child()
|
||||
|
||||
self._logger.info(_("Started %(children_count)d worker for %(name)s")
|
||||
% {'children_count': len(self.children),
|
||||
'name': self.name})
|
||||
|
||||
self.wait_on_children()
|
||||
|
||||
def run_child(self):
|
||||
try:
|
||||
pid = os.fork()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
if pid == 0:
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
# to avoid race condition that child receive signal before
|
||||
# parent and is respawned
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
try:
|
||||
self.run_server_in_process()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
self._logger.info(_('Child %d exiting normally') % os.getpid())
|
||||
return None
|
||||
else:
|
||||
self._logger.info(_('[%(name)s] Started worker (pid: %(pid)s)') %
|
||||
{'name': self.name,
|
||||
'pid': pid})
|
||||
self.children.append(pid)
|
||||
|
||||
def wait_on_children(self):
|
||||
while self.running:
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
self._logger.error(_('Dead worker %(pid)s') % locals())
|
||||
if pid in self.children:
|
||||
self.children.remove(pid)
|
||||
self.run_child()
|
||||
except OSError, err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
self._logger.info(_('Caught keyboard interrupt. Exiting.'))
|
||||
self.running = False
|
||||
break
|
||||
eventlet.greenio.shutdown_safe(self._socket)
|
||||
self._socket.close()
|
||||
self._logger.debug(_('Exited'))
|
||||
|
||||
def stop(self):
|
||||
"""Stop this server.
|
||||
@@ -106,7 +229,19 @@ class Server(object):
|
||||
|
||||
"""
|
||||
LOG.info(_("Stopping WSGI server."))
|
||||
self._server.kill()
|
||||
if self.workers > 0:
|
||||
# set running state to false and kill all workers
|
||||
self.running = False
|
||||
for pid in self.children:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
self.children.remove(pid)
|
||||
# now terminate master_worker
|
||||
if self.master_worker.is_alive():
|
||||
self.master_worker.terminate()
|
||||
else:
|
||||
# Resize Pool to stop accepting new connection
|
||||
self._pool.resize(0)
|
||||
self._server.kill()
|
||||
|
||||
def wait(self):
|
||||
"""Block, until the server has stopped.
|
||||
@@ -117,9 +252,16 @@ class Server(object):
|
||||
|
||||
"""
|
||||
try:
|
||||
self._server.wait()
|
||||
if self.workers and self.master_worker:
|
||||
# for services enabled multi-process,a separate master_worker
|
||||
# is already waiting
|
||||
pass
|
||||
else:
|
||||
self._pool.waitall()
|
||||
except greenlet.GreenletExit:
|
||||
LOG.info(_("WSGI server has stopped."))
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
class Request(webob.Request):
|
||||
|
||||
Reference in New Issue
Block a user