Merge "Add multi-process support for API services"

This commit is contained in:
Jenkins 2012-06-28 20:24:25 +00:00 committed by Gerrit Code Review
commit 96d5c1ef8c
13 changed files with 458 additions and 97 deletions

View File

@ -28,7 +28,7 @@ continue attempting to launch the rest of the services.
"""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@ -54,25 +54,26 @@ if __name__ == '__main__':
flags.parse_args(sys.argv)
logging.setup()
utils.monkey_patch()
servers = []
launcher = service.ProcessLauncher()
# nova-api
for api in flags.FLAGS.enabled_apis:
try:
servers.append(service.WSGIService(api))
server = service.WSGIService(api)
launcher.launch_server(server, workers=server.workers or 1)
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % '%s-api' % api)
for mod in [s3server, xvp_proxy]:
try:
servers.append(mod.get_wsgi_server())
launcher.launch_server(mod.get_wsgi_server())
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s') % mod.__name__)
for binary in ['nova-compute', 'nova-volume',
'nova-network', 'nova-scheduler', 'nova-cert']:
try:
servers.append(service.Service.create(binary=binary))
launcher.launch_server(service.Service.create(binary=binary))
except (Exception, SystemExit):
LOG.exception(_('Failed to load %s'), binary)
service.serve(*servers)
service.wait()
launcher.wait()

View File

@ -24,7 +24,7 @@ Starts both the EC2 and OpenStack APIs in separate greenthreads.
"""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@ -45,8 +45,8 @@ if __name__ == '__main__':
flags.parse_args(sys.argv)
logging.setup()
utils.monkey_patch()
servers = []
launcher = service.ProcessLauncher()
for api in flags.FLAGS.enabled_apis:
servers.append(service.WSGIService(api))
service.serve(*servers)
service.wait()
server = service.WSGIService(api)
launcher.launch_server(server, workers=server.workers or 1)
launcher.wait()

View File

@ -20,7 +20,7 @@
"""Starter script for Nova EC2 API."""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('ec2')
service.serve(server)
service.serve(server, workers=server.workers)
service.wait()

View File

@ -20,7 +20,7 @@
"""Starter script for Nova Metadata API."""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('metadata')
service.serve(server)
service.serve(server, workers=server.workers)
service.wait()

View File

@ -20,7 +20,7 @@
"""Starter script for Nova OS API."""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('osapi_compute')
service.serve(server)
service.serve(server, workers=server.workers)
service.wait()

View File

@ -20,7 +20,7 @@
"""Starter script for Nova OS API."""
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
import os
import sys
@ -42,5 +42,5 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.WSGIService('osapi_volume')
service.serve(server)
service.serve(server, workers=server.workers)
service.wait()

View File

@ -489,12 +489,18 @@
# ec2_listen_port=8773
#### (IntOpt) port for ec2 api to listen
# ec2_workers=0
#### (IntOpt) Number of EC2 API workers
# osapi_compute_listen=0.0.0.0
#### (StrOpt) IP address for OpenStack API to listen
# osapi_compute_listen_port=8774
#### (IntOpt) list port for osapi compute
# osapi_compute_workers=0
#### (IntOpt) Number of workers for OpenStack API
# metadata_manager=nova.api.manager.MetadataManager
#### (StrOpt) OpenStack metadata service manager
@ -504,12 +510,18 @@
# metadata_listen_port=8775
#### (IntOpt) port for metadata api to listen
# metadata_workers=0
#### (IntOpt) Number of workers for metadata API
# osapi_volume_listen=0.0.0.0
#### (StrOpt) IP address for OpenStack Volume API to listen
# osapi_volume_listen_port=8776
#### (IntOpt) port for os volume api to listen
# osapi_volume_workers=0
#### (IntOpt) Number of workers for OpenStack Volume API
######## defined in nova.test ########

View File

@ -19,10 +19,13 @@
"""Generic Node base class for all workers that run on hosts."""
import errno
import inspect
import os
import random
import signal
import sys
import time
import eventlet
import greenlet
@ -61,12 +64,18 @@ service_opts = [
cfg.IntOpt('ec2_listen_port',
default=8773,
help='port for ec2 api to listen'),
cfg.IntOpt('ec2_workers',
default=None,
help='Number of workers for EC2 API service'),
cfg.StrOpt('osapi_compute_listen',
default="0.0.0.0",
help='IP address for OpenStack API to listen'),
cfg.IntOpt('osapi_compute_listen_port',
default=8774,
help='list port for osapi compute'),
cfg.IntOpt('osapi_compute_workers',
default=None,
help='Number of workers for OpenStack API service'),
cfg.StrOpt('metadata_manager',
default='nova.api.manager.MetadataManager',
help='OpenStack metadata service manager'),
@ -76,12 +85,18 @@ service_opts = [
cfg.IntOpt('metadata_listen_port',
default=8775,
help='port for metadata api to listen'),
cfg.IntOpt('metadata_workers',
default=None,
help='Number of workers for metadata service'),
cfg.StrOpt('osapi_volume_listen',
default="0.0.0.0",
help='IP address for OpenStack Volume API to listen'),
cfg.IntOpt('osapi_volume_listen_port',
default=8776,
help='port for os volume api to listen')
help='port for os volume api to listen'),
cfg.IntOpt('osapi_volume_workers',
default=None,
help='Number of workers for OpenStack Volume API service'),
]
FLAGS = flags.FLAGS
@ -98,6 +113,7 @@ class Launcher(object):
"""
self._services = []
eventlet_backdoor.initialize_if_enabled()
@staticmethod
def run_server(server):
@ -135,15 +151,6 @@ class Launcher(object):
:returns: None
"""
def sigterm(sig, frame):
LOG.audit(_("SIGTERM received"))
# NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
# shuts down the service. This does not yet handle eventlet
# threads.
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, sigterm)
for service in self._services:
try:
service.wait()
@ -151,6 +158,198 @@ class Launcher(object):
pass
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo]
LOG.info(_('Caught %s, exiting'), signame)
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
sys.exit(1)
def wait(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
LOG.debug(_('Full set of FLAGS:'))
for flag in FLAGS:
flag_get = FLAGS.get(flag, None)
# hide flag contents from log if contains a password
# should use secret flag when switch over to openstack-common
if ("_password" in flag or "_key" in flag or
(flag == "sql_connection" and "mysql:" in flag_get)):
LOG.debug(_('%(flag)s : FLAG SET ') % locals())
else:
LOG.debug('%(flag)s : %(flag_get)s' % locals())
status = None
try:
super(ServiceLauncher, self).wait()
except SystemExit as exc:
status = exc.code
self.stop()
rpc.cleanup()
if status is not None:
sys.exit(status)
class ServerWrapper(object):
def __init__(self, server, workers):
self.server = server
self.workers = workers
self.children = set()
self.forktimes = []
class ProcessLauncher(object):
def __init__(self):
self.children = {}
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def _handle_signal(self, signo, frame):
signame = {signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT'}[signo]
LOG.info(_('Caught %s, stopping children'), signame)
self.running = False
for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
except OSError as exc:
if exc.errno != errno.ESRCH:
raise
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read()
LOG.info(_('Parent process has died unexpectedly, exiting'))
sys.exit(1)
def _child_process(self, server):
# Setup child signal handlers differently
def _sigterm(*args):
LOG.info(_('Received SIGTERM, stopping'))
signal.signal(signal.SIGTERM, signal.SIG_DFL)
server.stop()
signal.signal(signal.SIGTERM, _sigterm)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.run_server(server)
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_('Forking too fast, sleeping'))
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.server)
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_('Unhandled exception'))
status = 2
os._exit(status)
LOG.info(_('Started child %d'), pid)
wrap.children.add(pid)
self.children[pid] = wrap
return pid
def launch_server(self, server, workers=1):
wrap = ServerWrapper(server, workers)
LOG.info(_('Starting %d workers'), wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
def _wait_child(self):
try:
pid, status = os.wait()
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
return None
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
else:
code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
return None
wrap = self.children.pop(pid)
wrap.children.remove(pid)
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary"""
# Loop calling wait and respawning as necessary
while self.running:
wrap = self._wait_child()
if not wrap:
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
# Wait for children to die
if self.children:
LOG.info(_('Waiting on %d children to exit'), len(self.children))
while self.children:
self._wait_child()
class Service(object):
"""Service object for binaries running on hosts.
@ -170,7 +369,6 @@ class Service(object):
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
@ -361,10 +559,13 @@ class WSGIService(object):
self.app = self.loader.load_app(name)
self.host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0")
self.port = getattr(FLAGS, '%s_listen_port' % name, 0)
self.workers = getattr(FLAGS, '%s_workers' % name, None)
self.server = wsgi.Server(name,
self.app,
host=self.host,
port=self.port)
# Pull back actual port used
self.port = self.server.port
def _get_manager(self):
"""Initialize a Manager object appropriate for this service.
@ -400,7 +601,6 @@ class WSGIService(object):
if self.manager:
self.manager.init_host()
self.server.start()
self.port = self.server.port
def stop(self):
"""Stop serving this API.
@ -425,29 +625,18 @@ class WSGIService(object):
_launcher = None
def serve(*servers):
def serve(server, workers=None):
global _launcher
if not _launcher:
_launcher = Launcher()
for server in servers:
_launcher.launch_server(server)
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
eventlet_backdoor.initialize_if_enabled()
if workers:
_launcher = ProcessLauncher()
_launcher.launch_server(server, workers=workers)
else:
_launcher = ServiceLauncher()
_launcher.launch_server(server)
def wait():
LOG.debug(_('Full set of FLAGS:'))
for flag in FLAGS:
flag_get = FLAGS.get(flag, None)
# hide flag contents from log if contains a password
# should use secret flag when switch over to openstack-common
if ("_password" in flag or "_key" in flag or
(flag == "sql_connection" and "mysql:" in flag_get)):
LOG.debug(_('%(flag)s : FLAG SET ') % locals())
else:
LOG.debug('%(flag)s : %(flag_get)s' % locals())
try:
_launcher.wait()
except KeyboardInterrupt:
_launcher.stop()
rpc.cleanup()
_launcher.wait()

View File

@ -44,7 +44,7 @@ from nova import log as logging
import eventlet
eventlet.monkey_patch()
eventlet.monkey_patch(os=False)
FLAGS = flags.FLAGS
FLAGS.use_stderr = False

View File

@ -0,0 +1,169 @@
# Copyright (c) 2012 Intel, LLC
# Copyright (c) 2012 OpenStack, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test multiprocess enabled API service.
"""
import os
import signal
import time
import traceback
from nova import flags
from nova.log import logging
from nova import service
from nova.tests.integrated import integrated_helpers
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class MultiprocessWSGITest(integrated_helpers._IntegratedTestBase):
def _start_api_service(self):
# Process will be started in _spawn()
self.osapi = service.WSGIService("osapi_compute")
self.auth_url = 'http://%s:%s/v2' % (self.osapi.host, self.osapi.port)
LOG.info('auth_url = %s' % self.auth_url)
def _get_flags(self):
self.workers = 2
f = super(MultiprocessWSGITest, self)._get_flags()
f['osapi_compute_workers'] = self.workers
return f
def _spawn(self):
pid = os.fork()
if pid == 0:
# NOTE(johannes): We can't let the child processes exit back
# into the unit test framework since then we'll have multiple
# processes running the same tests (and possibly forking more
# processes that end up in the same situation). So we need
# to catch all exceptions and make sure nothing leaks out, in
# particlar SystemExit, which is raised by sys.exit(). We use
# os._exit() which doesn't have this problem.
status = 0
try:
launcher = service.ProcessLauncher()
launcher.launch_server(self.osapi, workers=self.osapi.workers)
launcher.wait()
except SystemExit as exc:
status = exc.code
except BaseException:
# We need to be defensive here too
try:
traceback.print_exc()
except BaseException:
print "Couldn't print traceback"
status = 2
# Really exit
os._exit(status)
self.pid = pid
# Wait for up to a second for workers to get started
start = time.time()
while time.time() - start < 1:
workers = self._get_workers()
if len(workers) == self.workers:
break
time.sleep(.1)
self.assertEqual(len(workers), self.workers)
return workers
def tearDown(self):
if self.pid:
# Make sure all processes are stopped
os.kill(self.pid, signal.SIGTERM)
# Make sure we reap our test process
self._reap_test()
super(MultiprocessWSGITest, self).tearDown()
def _reap_test(self):
pid, status = os.waitpid(self.pid, 0)
self.pid = None
return status
def _get_workers(self):
f = os.popen('ps ax -o pid,ppid,command')
# Skip ps header
f.readline()
processes = [tuple(int(p) for p in l.strip().split()[:2])
for l in f.readlines()]
return [p for p, pp in processes if pp == self.pid]
def test_killed_worker_recover(self):
start_workers = self._spawn()
# kill one worker and check if new worker can come up
LOG.info('pid of first child is %s' % start_workers[0])
os.kill(start_workers[0], signal.SIGTERM)
# loop and check if new worker is spawned (for 1 second max)
start = time.time()
while time.time() - start < 1:
end_workers = self._get_workers()
LOG.info('workers: %r' % end_workers)
if start_workers != end_workers:
break
time.sleep(.1)
# Make sure worker pids don't match
self.assertNotEqual(start_workers, end_workers)
# check if api service still works
flavors = self.api.get_flavors()
self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
def _terminate_with_signal(self, sig):
self._spawn()
# check if api service is working
flavors = self.api.get_flavors()
self.assertTrue(len(flavors) > 0, 'Num of flavors > 0.')
os.kill(self.pid, sig)
# loop and check if all processes are killed (for 1 second max)
start = time.time()
while time.time() - start < 1:
workers = self._get_workers()
LOG.info('workers: %r' % workers)
if not workers:
break
time.sleep(.1)
self.assertFalse(workers, 'No OS processes left.')
def test_terminate_sigkill(self):
self._terminate_with_signal(signal.SIGKILL)
status = self._reap_test()
self.assertTrue(os.WIFSIGNALED(status))
self.assertEqual(os.WTERMSIG(status), signal.SIGKILL)
def test_terminate_sigterm(self):
self._terminate_with_signal(signal.SIGTERM)
status = self._reap_test()
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(os.WEXITSTATUS(status), 0)

View File

@ -40,7 +40,7 @@ test_service_opts = [
default="nova.tests.test_service.FakeManager",
help="Manager for testing"),
cfg.StrOpt("test_service_listen",
default=None,
default='127.0.0.1',
help="Host to bind test service to"),
cfg.IntOpt("test_service_listen_port",
default=0,
@ -202,7 +202,6 @@ class TestWSGIService(test.TestCase):
def test_service_random_port(self):
test_service = service.WSGIService("test_service")
self.assertEquals(0, test_service.port)
test_service.start()
self.assertNotEqual(0, test_service.port)
test_service.stop()
@ -216,10 +215,7 @@ class TestLauncher(test.TestCase):
self.service = service.WSGIService("test_service")
def test_launch_app(self):
self.assertEquals(0, self.service.port)
launcher = service.Launcher()
launcher.launch_server(self.service)
# Give spawned thread a chance to execute
greenthread.sleep(0)
self.assertNotEquals(0, self.service.port)
launcher.stop()

View File

@ -84,8 +84,8 @@ class TestWSGIServer(unittest.TestCase):
self.assertEquals("test_app", server.name)
def test_start_random_port(self):
server = nova.wsgi.Server("test_random_port", None, host="127.0.0.1")
self.assertEqual(0, server.port)
server = nova.wsgi.Server("test_random_port", None,
host="127.0.0.1", port=0)
server.start()
self.assertNotEqual(0, server.port)
server.stop()

View File

@ -44,8 +44,8 @@ class Server(object):
default_pool_size = 1000
def __init__(self, name, app, host=None, port=None, pool_size=None,
protocol=eventlet.wsgi.HttpProtocol):
def __init__(self, name, app, host='0.0.0.0', port=0, pool_size=None,
protocol=eventlet.wsgi.HttpProtocol, backlog=128):
"""Initialize, but do not start, a WSGI server.
:param name: Pretty name for logging.
@ -53,47 +53,37 @@ class Server(object):
:param host: IP address to serve the application.
:param port: Port number to server the application.
:param pool_size: Maximum number of eventlets to spawn concurrently.
:returns: None
"""
self.name = name
self.app = app
self.host = host or "0.0.0.0"
self.port = port or 0
self._server = None
self._socket = None
self._protocol = protocol
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
self._logger = logging.getLogger("eventlet.wsgi.server")
self._wsgi_logger = logging.WritableLogger(self._logger)
def _start(self):
"""Run the blocking eventlet WSGI server.
:returns: None
"""
eventlet.wsgi.server(self._socket,
self.app,
protocol=self._protocol,
custom_pool=self._pool,
log=self._wsgi_logger)
def start(self, backlog=128):
"""Start serving a WSGI application.
:param backlog: Maximum number of queued connections.
:returns: None
:raises: nova.exception.InvalidInput
"""
self.name = name
self.app = app
self._server = None
self._protocol = protocol
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
self._logger = logging.getLogger("nova.%s.wsgi.server" % self.name)
self._wsgi_logger = logging.WritableLogger(self._logger)
if backlog < 1:
raise exception.InvalidInput(
reason='The backlog must be more than 1')
self._socket = eventlet.listen((self.host, self.port), backlog=backlog)
self._server = eventlet.spawn(self._start)
self._socket = eventlet.listen((host, port), backlog=backlog)
(self.host, self.port) = self._socket.getsockname()
LOG.info(_("Started %(name)s on %(host)s:%(port)s") % self.__dict__)
LOG.info(_("%(name)s listening on %(host)s:%(port)s") % self.__dict__)
def start(self):
"""Start serving a WSGI application.
:returns: None
"""
self._server = eventlet.spawn(eventlet.wsgi.server,
self._socket,
self.app,
protocol=self._protocol,
custom_pool=self._pool,
log=self._wsgi_logger)
def stop(self):
"""Stop this server.
@ -105,7 +95,11 @@ class Server(object):
"""
LOG.info(_("Stopping WSGI server."))
self._server.kill()
if self._server is not None:
# Resize pool to stop new requests from being processed
self._pool.resize(0)
self._server.kill()
def wait(self):
"""Block, until the server has stopped.