Merge "Add multi-process support for API services"

This commit is contained in:
Jenkins
2012-06-28 20:24:25 +00:00
committed by Gerrit Code Review
9 changed files with 274 additions and 90 deletions

View File

@@ -28,7 +28,7 @@ continue attempting to launch the rest of the services.
"""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@@ -54,25 +54,26 @@ if __name__ == '__main__':
flags.parse_args(sys.argv)
logging.setup()
utils.monkey_patch()
servers = []
launcher = service.ProcessLauncher()
# nova-api
for api in flags.FLAGS.enabled_apis:
try:
servers.append(service.WSGIService(api))
server = service.WSGIService(api)
launcher.launch_server(server, workers=server.workers or 1)
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % '%s-api' % api)
for mod in [s3server, xvp_proxy]:
try:
servers.append(mod.get_wsgi_server())
launcher.launch_server(mod.get_wsgi_server())
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % mod.__name__)
for binary in ['nova-compute', 'nova-volume',
'nova-network', 'nova-scheduler', 'nova-cert']:
try:
servers.append(service.Service.create(binary=binary))
launcher.launch_server(service.Service.create(binary=binary))
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s'), binary)
service.serve(*servers)
service.wait()
launcher.wait()

View File

@@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads.
"""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@@ -45,8 +45,8 @@ if __name__ == '__main__':
flags.parse_args(sys.argv)
logging.setup()
utils.monkey_patch()
servers = []
launcher = service.ProcessLauncher()
for api in flags.FLAGS.enabled_apis:
servers.append(service.WSGIService(api))
service.serve(*servers)
service.wait()
server = service.WSGIService(api)
launcher.launch_server(server, workers=server.workers or 1)
launcher.wait()

View File

@@ -20,7 +20,7 @@
"""Starter script for Nova EC2 API."""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('ec2')
service.serve(server)
service.serve(server, workers=server.workers)
service.wait()

View File

@@ -20,7 +20,7 @@
"""Starter script for Nova Metadata API."""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('metadata')
service.serve(server)
service.serve(server, workers=server.workers)
service.wait()

View File

@@ -20,7 +20,7 @@
"""Starter script for Nova OS API."""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('osapi_compute')
service.serve(server)
service.serve(server, workers=server.workers)
service.wait()

View File

@@ -20,7 +20,7 @@
"""Starter script for Nova OS API."""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('osapi_volume')
service.serve(server)
service.serve(server, workers=server.workers)
service.wait()

View File

@@ -19,10 +19,13 @@
"""Generic Node base class for all workers that run on hosts."""
import errno
import inspect
import os
import random
import signal
import sys
import time
import eventlet
import greenlet
@@ -61,12 +64,18 @@ service_opts = [
cfg.IntOpt('ec2_listen_port',
default=8773,
help='port for ec2 api to listen'),
cfg.IntOpt('ec2_workers',
default=None,
help='Number of workers for EC2 API service'),
cfg.StrOpt('osapi_compute_listen',
default="0.0.0.0",
help='IP address for OpenStack API to listen'),
cfg.IntOpt('osapi_compute_listen_port',
default=8774,
help='list port for osapi compute'),
cfg.IntOpt('osapi_compute_workers',
default=None,
help='Number of workers for OpenStack API service'),
cfg.StrOpt('metadata_manager',
default='nova.api.manager.MetadataManager',
help='OpenStack metadata service manager'),
@@ -76,12 +85,18 @@ service_opts = [
cfg.IntOpt('metadata_listen_port',
default=8775,
help='port for metadata api to listen'),
cfg.IntOpt('metadata_workers',
default=None,
help='Number of workers for metadata service'),
cfg.StrOpt('osapi_volume_listen',
default="0.0.0.0",
help='IP address for OpenStack Volume API to listen'),
cfg.IntOpt('osapi_volume_listen_port',
default=8776,
help='port for os volume api to listen')
help='port for os volume api to listen'),
cfg.IntOpt('osapi_volume_workers',
default=None,
help='Number of workers for OpenStack Volume API service'),
]
FLAGS = flags.FLAGS
@@ -98,6 +113,7 @@ class Launcher(object):
"""
self._services = []
eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_server(server):
@@ -135,15 +151,6 @@ class Launcher(object):
:returns: None
"""
def sigterm(sig, frame):
LOG.audit(_("SIGTERM received"))
# NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
# shuts down the service. This does not yet handle eventlet
# threads.
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, sigterm)
for service in self._services:
try:
service.wait()
@@ -151,6 +158,198 @@ class Launcher(object):
pass
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo]
LOG.info(_('Caught %s, exiting'), signame)
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
sys.exit(1)
def wait(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
LOG.debug(_('Full set of FLAGS:'))
for flag in FLAGS:
flag_get = FLAGS.get(flag, None)
# hide flag contents from log if contains a password
# should use secret flag when switch over to openstack-common
if ("_password" in flag or "_key" in flag or
(flag == "sql_connection" and "mysql:" in flag_get)):
LOG.debug(_('%(flag)s : FLAG SET ') % locals())
else:
LOG.debug('%(flag)s : %(flag_get)s' % locals())
status = None
try:
super(ServiceLauncher, self).wait()
except SystemExit as exc:
status = exc.code
self.stop()
rpc.cleanup()
if status is not None:
sys.exit(status)
class ServerWrapper(object):
def __init__(self, server, workers):
self.server = server
self.workers = workers
self.children = set()
self.forktimes = []
class ProcessLauncher(object):
def __init__(self):
self.children = {}
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def _handle_signal(self, signo, frame):
signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo]
LOG.info(_('Caught %s, stopping children'), signame)
self.running = False
for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
except OSError as exc:
if exc.errno != errno.ESRCH:
raise
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read()
LOG.info(_('Parent process has died unexpectedly, exiting'))
sys.exit(1)
def _child_process(self, server):
# Setup child signal handlers differently
def _sigterm(*args):
LOG.info(_('Received SIGTERM, stopping'))
signal.signal(signal.SIGTERM, signal.SIG_DFL)
server.stop()
signal.signal(signal.SIGTERM, _sigterm)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.run_server(server)
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_('Forking too fast, sleeping'))
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.server)
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
os._exit(status)
LOG.info(_('Started child %d'), pid)
wrap.children.add(pid)
self.children[pid] = wrap
return pid
def launch_server(self, server, workers=1):
wrap = ServerWrapper(server, workers)
LOG.info(_('Starting %d workers'), wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
def _wait_child(self):
try:
pid, status = os.wait()
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
return None
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
else:
code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
return None
wrap = self.children.pop(pid)
wrap.children.remove(pid)
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary"""
# Loop calling wait and respawning as necessary
while self.running:
wrap = self._wait_child()
if not wrap:
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
# Wait for children to die
if self.children:
LOG.info(_('Waiting on %d children to exit'), len(self.children))
while self.children:
self._wait_child()
class Service(object):
"""Service object for binaries running on hosts.
@@ -170,7 +369,6 @@ class Service(object):
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
@@ -361,10 +559,13 @@ class WSGIService(object):
self.app = self.loader.load_app(name)
self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0")
self.port = getattr(FLAGS, '%s_listen_port' % name, 0)
self.workers = getattr(FLAGS, '%s_workers' % name, None)
self.server = wsgi.Server(name,
self.app,
host=self.host,
port=self.port)
# Pull back actual port used
self.port = self.server.port
def _get_manager(self):
"""Initialize a Manager object appropriate for this service.
@@ -400,7 +601,6 @@ class WSGIService(object):
if self.manager:
self.manager.init_host()
self.server.start()
self.port = self.server.port
def stop(self):
"""Stop serving this API.
@@ -425,29 +625,18 @@ class WSGIService(object):
_launcher = None
def serve(*servers):
def serve(server, workers=None):
global _launcher
if not _launcher:
_launcher = Launcher()
for server in servers:
_launcher.launch_server(server)
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
eventlet_backdoor.initialize_if_enabled()
if workers:
_launcher = ProcessLauncher()
_launcher.launch_server(server, workers=workers)
else:
_launcher = ServiceLauncher()
_launcher.launch_server(server)
def wait():
LOG.debug(_('Full set of FLAGS:'))
for flag in FLAGS:
flag_get = FLAGS.get(flag, None)
# hide flag contents from log if contains a password
# should use secret flag when switch over to openstack-common
if ("_password" in flag or "_key" in flag or
(flag == "sql_connection" and "mysql:" in flag_get)):
LOG.debug(_('%(flag)s : FLAG SET ') % locals())
else:
LOG.debug('%(flag)s : %(flag_get)s' % locals())
try:
_launcher.wait()
except KeyboardInterrupt:
_launcher.stop()
rpc.cleanup()
_launcher.wait()

View File

@@ -44,7 +44,7 @@ from nova import log as logging
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
FLAGS = flags.FLAGS
FLAGS.use_stderr = False

View File

@@ -44,8 +44,8 @@ class Server(object):
default_pool_size = 1000
def __init__(self, name, app, host=None, port=None, pool_size=None,
protocol=eventlet.wsgi.HttpProtocol):
def __init__(self, name, app, host='0.0.0.0', port=0, pool_size=None,
protocol=eventlet.wsgi.HttpProtocol, backlog=128):
"""Initialize, but do not start, a WSGI server.
:param name: Pretty name for logging.
@@ -53,47 +53,37 @@ class Server(object):
:param host: IP address to serve the application.
:param port: Port number to server the application.
:param pool_size: Maximum number of eventlets to spawn concurrently.
:returns: None
"""
self.name = name
self.app = app
self.host = host or "0.0.0.0"
self.port = port or 0
self._server = None
self._socket = None
self._protocol = protocol
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = logging.WritableLogger(self._logger)
def _start(self):
"""Run the blocking eventlet WSGI server.
:returns: None
"""
eventlet.wsgi.server(self._socket,
self.app,
protocol=self._protocol,
custom_pool=self._pool,
log=self._wsgi_logger)
def start(self, backlog=128):
"""Start serving a WSGI application.
:param backlog: Maximum number of queued connections.
:returns: None
:raises: nova.exception.InvalidInput
"""
self.name = name
self.app = app
self._server = None
self._protocol = protocol
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name)
self._wsgi_logger = logging.WritableLogger(self._logger)
if backlog < 1:
raise exception.InvalidInput(
reason='The backlog must be more than 1')
self._socket = eventlet.listen((self.host, self.port), backlog=backlog)
self._server = eventlet.spawn(self._start)
self._socket = eventlet.listen((host, port), backlog=backlog)
(self.host, self.port) = self._socket.getsockname()
LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__)
LOG.info(_("%(name)s listening on %(host)s:%(port)s") % self.__dict__)
def start(self):
"""Start serving a WSGI application.
:returns: None
"""
self._server = eventlet.spawn(eventlet.wsgi.server,
self._socket,
self.app,
protocol=self._protocol,
custom_pool=self._pool,
log=self._wsgi_logger)
def stop(self):
"""Stop this server.
@@ -105,7 +95,11 @@ class Server(object):
"""
LOG.info(_("Stopping WSGI server."))
self._server.kill()
if self._server is not None:
# Resize pool to stop new requests from being processed
self._pool.resize(0)
self._server.kill()
def wait(self):
"""Block, until the server has stopped.