From 2b39c106f23ce3b38dce57b0b8516317cf2d5c9a Mon Sep 17 00:00:00 2001 From: Zhiteng Huang Date: Sun, 25 Mar 2012 02:06:01 +0800 Subject: [PATCH] blueprint Add multiprocess support for API serivces (EC2/OSAPI_Compute/OSAPI_Volume/Metadata). 2012-06-1 v7: * Add unittest to cover worker recovery, service termination functionality in wsgi.py, fix python 2.6 compatibility issue. * Modify generate_uid() to introduce per-process seeds in utils.py to avoid collisions. * Add worker session to nova.conf.sample. 2012-05-21 v6: * Fix 'test_wsgi' unittest error. 2012-04-28 v5: * Add SIGINT handler and fix child-parent race condition when Ctrl+C is pressed. 2012-03-31 v4: * Fixed typo, removed debug code. 2012-03-30 v3: * Fixed localization/pep8 error in unittest, add metadata test. * nova/wsgi.py:Server: use the greenthread pool created for each process. * nova/service.py: remove debug code 2012-03-27 v2: * Fixed unittest error. * nova/wsgi.py:Server: Use self._logger to do logging in multiprocess mode. * nova/wsgi.py:Server: Move self._pool creation into proper place. * code style fix. 2012-03-25 v1: * Modification to nova/service.py and nova/wsgi.py in order to support multiprocess (a.k.a. workers) for various API services. If multiprocess mode is enabled, (i.e. flags 'APINAME_workers' set to positive numbers), corresponding API service will run in target number of process(es). There is also a master_worker process spawned for managing all workers (handling signal/termination). * Add unittest for multiprocess API service, also alter testing/runner.py to adopt new unittest. Change-Id: Ia045e595543ddfd192894b2a05801cc4b7ca90cb --- Authors | 1 + bin/nova-api | 2 +- bin/nova-api-ec2 | 2 +- bin/nova-api-metadata | 2 +- bin/nova-api-os-compute | 2 +- bin/nova-api-os-volume | 2 +- nova/service.py | 26 ++++--- nova/testing/runner.py | 2 +- nova/utils.py | 8 ++ nova/wsgi.py | 166 +++++++++++++++++++++++++++++++++++++--- 10 files changed, 185 insertions(+), 28 deletions(-) diff --git a/Authors b/Authors index a78e0033b..527c760f0 100644 --- a/Authors +++ b/Authors @@ -213,6 +213,7 @@ Yun Mao Yun Shen Yuriy Taraday Zed Shaw +Zhiteng Huang Zhixue Wu Zhongyue Luo Ziad Sawalha diff --git a/bin/nova-api b/bin/nova-api index e6779df4f..5d15fc7c4 100755 --- a/bin/nova-api +++ b/bin/nova-api @@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads. """ import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/bin/nova-api-ec2 b/bin/nova-api-ec2 index b53c9158a..6d94466d6 100755 --- a/bin/nova-api-ec2 +++ b/bin/nova-api-ec2 @@ -20,7 +20,7 @@ """Starter script for Nova EC2 API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/bin/nova-api-metadata b/bin/nova-api-metadata index 2f2ef9454..3d78cfcee 100755 --- a/bin/nova-api-metadata +++ b/bin/nova-api-metadata @@ -20,7 +20,7 @@ """Starter script for Nova Metadata API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/bin/nova-api-os-compute b/bin/nova-api-os-compute index 75c921943..978a14868 100755 --- a/bin/nova-api-os-compute +++ b/bin/nova-api-os-compute @@ -20,7 +20,7 @@ """Starter script for Nova OS API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/bin/nova-api-os-volume b/bin/nova-api-os-volume index b93fd51ae..d891d0754 100755 --- a/bin/nova-api-os-volume +++ b/bin/nova-api-os-volume @@ -20,7 +20,7 @@ """Starter script for Nova OS API.""" import eventlet -eventlet.monkey_patch() +eventlet.monkey_patch(os=False) import os import sys diff --git a/nova/service.py b/nova/service.py index b179cda6c..136668311 100644 --- a/nova/service.py +++ b/nova/service.py @@ -61,12 +61,18 @@ service_opts = [ cfg.IntOpt('ec2_listen_port', default=8773, help='port for ec2 api to listen'), + cfg.IntOpt('ec2_workers', + default=0, + help='Number of workers for EC2 API service'), cfg.StrOpt('osapi_compute_listen', default="0.0.0.0", help='IP address for OpenStack API to listen'), cfg.IntOpt('osapi_compute_listen_port', default=8774, help='list port for osapi compute'), + cfg.IntOpt('osapi_compute_workers', + default=0, + help='Number of workers for OpenStack API service'), cfg.StrOpt('metadata_manager', default='nova.api.manager.MetadataManager', help='OpenStack metadata service manager'), @@ -76,12 +82,18 @@ service_opts = [ cfg.IntOpt('metadata_listen_port', default=8775, help='port for metadata api to listen'), + cfg.IntOpt('metadata_workers', + default=0, + help='Number of workers for metadata service'), cfg.StrOpt('osapi_volume_listen', default="0.0.0.0", help='IP address for OpenStack Volume API to listen'), cfg.IntOpt('osapi_volume_listen_port', default=8776, - help='port for os volume api to listen') + help='port for os volume api to listen'), + cfg.IntOpt('osapi_volume_workers', + default=0, + help='Number of workers for OpenStack Volume API service') ] FLAGS = flags.FLAGS @@ -135,14 +147,6 @@ class Launcher(object): :returns: None """ - def sigterm(sig, frame): - LOG.audit(_("SIGTERM received")) - # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly - # shuts down the service. This does not yet handle eventlet - # threads. - raise KeyboardInterrupt - - signal.signal(signal.SIGTERM, sigterm) for service in self._services: try: @@ -362,10 +366,12 @@ class WSGIService(object): self.app = self.loader.load_app(name) self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0") self.port = getattr(FLAGS, '%s_listen_port' % name, 0) + self.workers = getattr(FLAGS, '%s_workers' % name, 0) self.server = wsgi.Server(name, self.app, host=self.host, - port=self.port) + port=self.port, + workers=self.workers) def _get_manager(self): """Initialize a Manager object appropriate for this service. diff --git a/nova/testing/runner.py b/nova/testing/runner.py index 2b5004394..cdba02acb 100644 --- a/nova/testing/runner.py +++ b/nova/testing/runner.py @@ -366,5 +366,5 @@ def run(): if __name__ == '__main__': - eventlet.monkey_patch() + eventlet.monkey_patch(os=False) run() diff --git a/nova/utils.py b/nova/utils.py index ae99212e9..403a6d960 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -63,6 +63,7 @@ LOG = logging.getLogger(__name__) ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" FLAGS = flags.FLAGS +RESEED = True FLAGS.register_opt( cfg.BoolOpt('disable_process_locking', default=False, @@ -290,6 +291,13 @@ def debug(arg): def generate_uid(topic, size=8): + global RESEED + if RESEED: + random.seed("%d%s%s" % (os.getpid(), + socket.gethostname(), + time.time())) + RESEED = False + characters = '01234567890abcdefghijklmnopqrstuvwxyz' choices = [random.choice(characters) for _x in xrange(size)] return '%s-%s' % (topic, ''.join(choices)) diff --git a/nova/wsgi.py b/nova/wsgi.py index 118fd14e1..27e24e210 100644 --- a/nova/wsgi.py +++ b/nova/wsgi.py @@ -19,12 +19,16 @@ """Utility methods for working with WSGI servers.""" -import os.path +import errno +import os +import signal import sys import eventlet +from eventlet.green import socket import eventlet.wsgi import greenlet +import multiprocessing from paste import deploy import routes.middleware import webob.dec @@ -45,14 +49,15 @@ class Server(object): default_pool_size = 1000 - def __init__(self, name, app, host=None, port=None, pool_size=None, - protocol=eventlet.wsgi.HttpProtocol): + def __init__(self, name, app, host=None, port=None, workers=None, + pool_size=None, protocol=eventlet.wsgi.HttpProtocol): """Initialize, but do not start, a WSGI server. :param name: Pretty name for logging. :param app: The WSGI application to serve. :param host: IP address to serve the application. :param port: Port number to server the application. + :param workers: Number of process to spawn concurrently :param pool_size: Maximum number of eventlets to spawn concurrently. :returns: None @@ -61,12 +66,17 @@ class Server(object): self.app = app self.host = host or "0.0.0.0" self.port = port or 0 + self.workers = workers or 0 self._server = None self._socket = None self._protocol = protocol - self._pool = eventlet.GreenPool(pool_size or self.default_pool_size) - self._logger = logging.getLogger("eventlet.wsgi.server") + self._pool_size = pool_size or self.default_pool_size + self._pool = None + self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name) self._wsgi_logger = logging.WritableLogger(self._logger) + self.master_worker = None + self.children = [] + self.running = True def _start(self): """Run the blocking eventlet WSGI server. @@ -90,11 +100,124 @@ class Server(object): """ if backlog < 1: raise exception.InvalidInput( - reason='The backlog must be more than 1') - self._socket = eventlet.listen((self.host, self.port), backlog=backlog) - self._server = eventlet.spawn(self._start) - (self.host, self.port) = self._socket.getsockname() - LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__) + reason='The backlog must be more than 1') + + try: + self._socket = eventlet.listen((self.host, self.port), + backlog=backlog) + (self.host, self.port) = self._socket.getsockname() + except socket.error, err: + if err[0] != errno.EINVAL: + raise + + if self.workers == 0: + # single process mode, useful for profiling, test, debug etc. + self._pool = eventlet.GreenPool(self._pool_size) + self._server = self._pool.spawn(self._start) + LOG.info(_("Started %(name)s on %(host)s:%(port)s") % + self.__dict__) + return None + + # master_worker doesn't actually do work (i.e. handle API request) + # but it's a managing process to handle signal/termination for + # this type of API service, only needed if workers > 1 + self.master_worker = multiprocessing.Process(target=self.run_workers, + args=()) + self.master_worker.start() + self._logger.info(_("Started %(name)s on %(host)s:%(port)s in process") + % self.__dict__) + return None + + def run_server_in_process(self): + """Run a WSGI server.""" + eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0" + eventlet.hubs.use_hub('poll') + eventlet.patcher.monkey_patch(all=False, socket=True) + + self._pool = eventlet.GreenPool(size=self._pool_size) + try: + self._pool.spawn_n(self._start) + except socket.error, err: + if err[0] != errno.EINVAL: + raise + + self._pool.waitall() + + def run_workers(self): + """Start workers and wait for them to join""" + def kill_children(*args): + """Kills the entire process group.""" + #TODO(zhiteng) Gracefully kill all eventlet greenthread + self._logger.error(_('SIGTERM or SIGINT received')) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + signal.signal(signal.SIGINT, signal.SIG_IGN) + self.running = False + for pid in self.children: + os.kill(pid, signal.SIGTERM) + + def hup(*args): + """ + Shuts down the server, but allows running requests to complete + """ + self._logger.error(_('SIGHUP received')) + signal.signal(signal.SIGHUP, signal.SIG_IGN) + self.running = False + + signal.signal(signal.SIGTERM, kill_children) + signal.signal(signal.SIGINT, kill_children) + signal.signal(signal.SIGHUP, hup) + + while len(self.children) < self.workers: + self.run_child() + + self._logger.info(_("Started %(children_count)d worker for %(name)s") + % {'children_count': len(self.children), + 'name': self.name}) + + self.wait_on_children() + + def run_child(self): + try: + pid = os.fork() + except KeyboardInterrupt: + pass + if pid == 0: + signal.signal(signal.SIGHUP, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + # to avoid race condition that child receive signal before + # parent and is respawned + signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + self.run_server_in_process() + except KeyboardInterrupt: + pass + self._logger.info(_('Child %d exiting normally') % os.getpid()) + return None + else: + self._logger.info(_('[%(name)s] Started worker (pid: %(pid)s)') % + {'name': self.name, + 'pid': pid}) + self.children.append(pid) + + def wait_on_children(self): + while self.running: + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + self._logger.error(_('Dead worker %(pid)s') % locals()) + if pid in self.children: + self.children.remove(pid) + self.run_child() + except OSError, err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + self._logger.info(_('Caught keyboard interrupt. Exiting.')) + self.running = False + break + eventlet.greenio.shutdown_safe(self._socket) + self._socket.close() + self._logger.debug(_('Exited')) def stop(self): """Stop this server. @@ -106,7 +229,19 @@ class Server(object): """ LOG.info(_("Stopping WSGI server.")) - self._server.kill() + if self.workers > 0: + # set running state to false and kill all workers + self.running = False + for pid in self.children: + os.kill(pid, signal.SIGTERM) + self.children.remove(pid) + # now terminate master_worker + if self.master_worker.is_alive(): + self.master_worker.terminate() + else: + # Resize Pool to stop accepting new connection + self._pool.resize(0) + self._server.kill() def wait(self): """Block, until the server has stopped. @@ -117,9 +252,16 @@ class Server(object): """ try: - self._server.wait() + if self.workers and self.master_worker: + # for services enabled multi-process,a separate master_worker + # is already waiting + pass + else: + self._pool.waitall() except greenlet.GreenletExit: LOG.info(_("WSGI server has stopped.")) + except KeyboardInterrupt: + pass class Request(webob.Request):