Merge "Enable multi-process for API service"
This commit is contained in:
commit
533c21129a
@ -54,20 +54,20 @@ def main():
|
||||
LOG = log.getLogger('manila.all')
|
||||
|
||||
utils.monkey_patch()
|
||||
servers = []
|
||||
launcher = service.process_launcher()
|
||||
# manila-api
|
||||
try:
|
||||
servers.append(service.WSGIService('osapi_share'))
|
||||
server = service.WSGIService('osapi_share')
|
||||
launcher.launch_service(server, workers=server.workers or 1)
|
||||
except (Exception, SystemExit):
|
||||
LOG.exception(_LE('Failed to load osapi_share'))
|
||||
|
||||
for binary in ['manila-share', 'manila-scheduler', 'manila-api']:
|
||||
try:
|
||||
servers.append(service.Service.create(binary=binary))
|
||||
launcher.launch_service(service.Service.create(binary=binary))
|
||||
except (Exception, SystemExit):
|
||||
LOG.exception(_LE('Failed to load %s'), binary)
|
||||
service.serve(*servers)
|
||||
service.wait()
|
||||
launcher.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -18,10 +18,6 @@
|
||||
|
||||
"""Starter script for manila OS API."""
|
||||
|
||||
# NOTE(jdg): If we port over multi worker code from Nova
|
||||
# we'll need to set monkey_patch(os=False), unless
|
||||
# eventlet is updated/released to fix the root issue
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
@ -48,9 +44,11 @@ def main():
|
||||
config.verify_share_protocols()
|
||||
log.setup(CONF, "manila")
|
||||
utils.monkey_patch()
|
||||
|
||||
launcher = service.process_launcher()
|
||||
server = service.WSGIService('osapi_share')
|
||||
service.serve(server)
|
||||
service.wait()
|
||||
launcher.launch_service(server, workers=server.workers or 1)
|
||||
launcher.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -42,17 +42,17 @@ def main():
|
||||
version=version.version_string())
|
||||
log.setup(CONF, "manila")
|
||||
utils.monkey_patch()
|
||||
launcher = service.ProcessLauncher()
|
||||
launcher = service.process_launcher()
|
||||
if CONF.enabled_share_backends:
|
||||
for backend in CONF.enabled_share_backends:
|
||||
host = "%s@%s" % (CONF.host, backend)
|
||||
server = service.Service.create(host=host,
|
||||
service_name=backend,
|
||||
binary='manila-share')
|
||||
launcher.launch_server(server)
|
||||
launcher.launch_service(server)
|
||||
else:
|
||||
server = service.Service.create(binary='manila-share')
|
||||
launcher.launch_server(server)
|
||||
launcher.launch_service(server)
|
||||
launcher.wait()
|
||||
|
||||
|
||||
|
@ -17,20 +17,15 @@
|
||||
|
||||
"""Generic Node base class for all workers that run on hosts."""
|
||||
|
||||
import errno
|
||||
import inspect
|
||||
import os
|
||||
import random
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
import greenlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_service import service
|
||||
from oslo_utils import importutils
|
||||
|
||||
from manila import context
|
||||
@ -62,271 +57,17 @@ service_opts = [
|
||||
help='IP address for OpenStack Share API to listen on.'),
|
||||
cfg.IntOpt('osapi_share_listen_port',
|
||||
default=8786,
|
||||
help='Port for OpenStack Share API to listen on.'), ]
|
||||
help='Port for OpenStack Share API to listen on.'),
|
||||
cfg.IntOpt('osapi_share_workers',
|
||||
default=1,
|
||||
help='Number of workers for OpenStack Share API service.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(service_opts)
|
||||
|
||||
|
||||
class SignalExit(SystemExit):
|
||||
def __init__(self, signo, exccode=1):
|
||||
super(SignalExit, self).__init__(exccode)
|
||||
self.signo = signo
|
||||
|
||||
|
||||
class Launcher(object):
|
||||
"""Launch one or more services and wait for them to complete."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the service launcher.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
self._services = []
|
||||
|
||||
@staticmethod
|
||||
def run_server(server):
|
||||
"""Start and wait for a server to finish.
|
||||
|
||||
:param service: Server to run and wait for.
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
server.start()
|
||||
server.wait()
|
||||
|
||||
def launch_server(self, server):
|
||||
"""Load and start the given server.
|
||||
|
||||
:param server: The server you would like to start.
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
gt = eventlet.spawn(self.run_server, server)
|
||||
self._services.append(gt)
|
||||
|
||||
def stop(self):
|
||||
"""Stop all services which are currently running.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
for service in self._services:
|
||||
service.kill()
|
||||
|
||||
def wait(self):
|
||||
"""Waits until all services have been stopped, and then returns.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
def sigterm(sig, frame):
|
||||
LOG.info(_LI("SIGTERM received"))
|
||||
# NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
|
||||
# shuts down the service. This does not yet handle eventlet
|
||||
# threads.
|
||||
raise KeyboardInterrupt
|
||||
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
|
||||
for service in self._services:
|
||||
try:
|
||||
service.wait()
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
|
||||
|
||||
class ServerWrapper(object):
|
||||
def __init__(self, server, workers):
|
||||
self.server = server
|
||||
self.workers = workers
|
||||
self.children = set()
|
||||
self.forktimes = []
|
||||
self.failed = False
|
||||
|
||||
|
||||
class ProcessLauncher(object):
|
||||
def __init__(self):
|
||||
self.children = {}
|
||||
self.sigcaught = None
|
||||
self.totalwrap = 0
|
||||
self.failedwrap = 0
|
||||
self.running = True
|
||||
rfd, self.writepipe = os.pipe()
|
||||
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
|
||||
|
||||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||||
signal.signal(signal.SIGINT, self._handle_signal)
|
||||
|
||||
def _handle_signal(self, signo, frame):
|
||||
self.sigcaught = signo
|
||||
self.running = False
|
||||
|
||||
# Allow the process to be killed again and die from natural causes
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
|
||||
def _pipe_watcher(self):
|
||||
# This will block until the write end is closed when the parent
|
||||
# dies unexpectedly
|
||||
self.readpipe.read()
|
||||
|
||||
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
def _child_process(self, server):
|
||||
# Setup child signal handlers differently
|
||||
def _sigterm(*args):
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
raise SignalExit(signal.SIGTERM)
|
||||
|
||||
signal.signal(signal.SIGTERM, _sigterm)
|
||||
# Block SIGINT and let the parent send us a SIGTERM
|
||||
# signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
# This differs from the behavior in nova in that we dont ignore this
|
||||
# It allows the non-wsgi services to be terminated properly
|
||||
signal.signal(signal.SIGINT, _sigterm)
|
||||
|
||||
# Reopen the eventlet hub to make sure we don't share an epoll
|
||||
# fd with parent and/or siblings, which would be bad
|
||||
eventlet.hubs.use_hub()
|
||||
|
||||
# Close write to ensure only parent has it open
|
||||
os.close(self.writepipe)
|
||||
# Create greenthread to watch for parent to close pipe
|
||||
eventlet.spawn(self._pipe_watcher)
|
||||
|
||||
# Reseed random number generator
|
||||
random.seed()
|
||||
|
||||
launcher = Launcher()
|
||||
launcher.run_server(server)
|
||||
|
||||
def _start_child(self, wrap):
|
||||
if len(wrap.forktimes) > wrap.workers:
|
||||
# Limit ourselves to one process a second (over the period of
|
||||
# number of workers * 1 second). This will allow workers to
|
||||
# start up quickly but ensure we don't fork off children that
|
||||
# die instantly too quickly.
|
||||
if time.time() - wrap.forktimes[0] < wrap.workers:
|
||||
LOG.info(_LI('Forking too fast, sleeping'))
|
||||
time.sleep(1)
|
||||
|
||||
wrap.forktimes.pop(0)
|
||||
|
||||
wrap.forktimes.append(time.time())
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# NOTE(johannes): All exceptions are caught to ensure this
|
||||
# doesn't fallback into the loop spawning children. It would
|
||||
# be bad for a child to spawn more children.
|
||||
status = 0
|
||||
try:
|
||||
self._child_process(wrap.server)
|
||||
except SignalExit as exc:
|
||||
signame = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT'}[exc.signo]
|
||||
LOG.info(_LI('Caught %s, exiting'), signame)
|
||||
status = exc.code
|
||||
except SystemExit as exc:
|
||||
status = exc.code
|
||||
except BaseException:
|
||||
LOG.exception(_LE('Unhandled exception'))
|
||||
status = 2
|
||||
finally:
|
||||
wrap.server.stop()
|
||||
|
||||
os._exit(status)
|
||||
|
||||
LOG.info(_LI('Started child %d'), pid)
|
||||
|
||||
wrap.children.add(pid)
|
||||
self.children[pid] = wrap
|
||||
|
||||
return pid
|
||||
|
||||
def launch_server(self, server, workers=1):
|
||||
wrap = ServerWrapper(server, workers)
|
||||
self.totalwrap = self.totalwrap + 1
|
||||
LOG.info(_LI('Starting %d workers'), wrap.workers)
|
||||
while (self.running and len(wrap.children) < wrap.workers
|
||||
and not wrap.failed):
|
||||
self._start_child(wrap)
|
||||
|
||||
def _wait_child(self):
|
||||
try:
|
||||
# Don't block if no child processes have exited
|
||||
pid, status = os.waitpid(0, os.WNOHANG)
|
||||
if not pid:
|
||||
return None
|
||||
except OSError as exc:
|
||||
if exc.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
return None
|
||||
|
||||
code = 0
|
||||
if os.WIFSIGNALED(status):
|
||||
sig = os.WTERMSIG(status)
|
||||
LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
|
||||
{'pid': pid, 'sig': sig})
|
||||
else:
|
||||
code = os.WEXITSTATUS(status)
|
||||
LOG.info(_LI('Child %(pid)d exited with status %(code)d'),
|
||||
{'pid': pid, 'code': code})
|
||||
|
||||
if pid not in self.children:
|
||||
LOG.warning(_LW('pid %d not in child list'), pid)
|
||||
return None
|
||||
|
||||
wrap = self.children.pop(pid)
|
||||
wrap.children.remove(pid)
|
||||
if 2 == code:
|
||||
wrap.failed = True
|
||||
self.failedwrap = self.failedwrap + 1
|
||||
LOG.info(_LI('_wait_child %d'), self.failedwrap)
|
||||
if self.failedwrap == self.totalwrap:
|
||||
self.running = False
|
||||
return wrap
|
||||
|
||||
def wait(self):
|
||||
"""Loop waiting on children to die and respawning as necessary."""
|
||||
while self.running:
|
||||
wrap = self._wait_child()
|
||||
if not wrap:
|
||||
# Yield to other threads if no children have exited
|
||||
# Sleep for a short time to avoid excessive CPU usage
|
||||
# (see bug #1095346)
|
||||
eventlet.greenthread.sleep(.01)
|
||||
continue
|
||||
|
||||
LOG.info(_LI('wait wrap.failed %s'), wrap.failed)
|
||||
while (self.running and len(wrap.children) < wrap.workers
|
||||
and not wrap.failed):
|
||||
self._start_child(wrap)
|
||||
|
||||
if self.sigcaught:
|
||||
signame = {signal.SIGTERM: 'SIGTERM',
|
||||
signal.SIGINT: 'SIGINT'}[self.sigcaught]
|
||||
LOG.info(_LI('Caught %s, stopping children'), signame)
|
||||
|
||||
for pid in self.children:
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except OSError as exc:
|
||||
if exc.errno != errno.ESRCH:
|
||||
raise
|
||||
|
||||
# Wait for children to die
|
||||
if self.children:
|
||||
LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
|
||||
while self.children:
|
||||
self._wait_child()
|
||||
|
||||
|
||||
class Service(object):
|
||||
class Service(service.Service):
|
||||
"""Service object for binaries running on hosts.
|
||||
|
||||
A service takes a manager and enables rpc by listening to queues based
|
||||
@ -469,6 +210,8 @@ class Service(object):
|
||||
pass
|
||||
self.timers = []
|
||||
|
||||
super(Service, self).stop()
|
||||
|
||||
def wait(self):
|
||||
for x in self.timers:
|
||||
try:
|
||||
@ -514,7 +257,7 @@ class Service(object):
|
||||
LOG.exception(_LE('model server went away'))
|
||||
|
||||
|
||||
class WSGIService(object):
|
||||
class WSGIService(service.ServiceBase):
|
||||
"""Provides ability to launch API from a 'paste' configuration."""
|
||||
|
||||
def __init__(self, name, loader=None):
|
||||
@ -533,6 +276,13 @@ class WSGIService(object):
|
||||
self.app = self.loader.load_app(name)
|
||||
self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0")
|
||||
self.port = getattr(CONF, '%s_listen_port' % name, 0)
|
||||
self.workers = getattr(CONF, '%s_workers' % name, None)
|
||||
if self.workers < 1:
|
||||
LOG.warn(
|
||||
_LW("Value of config option %(name)s_workers must be integer "
|
||||
"greater than 1. Input value ignored.") % {'name': name})
|
||||
# Reset workers to default
|
||||
self.workers = None
|
||||
self.server = wsgi.Server(name,
|
||||
self.app,
|
||||
host=self.host,
|
||||
@ -589,6 +339,17 @@ class WSGIService(object):
|
||||
"""
|
||||
self.server.wait()
|
||||
|
||||
def reset(self):
|
||||
"""Reset server greenpool size to default.
|
||||
|
||||
:returns: None
|
||||
"""
|
||||
self.server.reset()
|
||||
|
||||
|
||||
def process_launcher():
|
||||
return service.ProcessLauncher(CONF)
|
||||
|
||||
|
||||
# NOTE(vish): the global launcher is to maintain the existing
|
||||
# functionality of calling service.serve +
|
||||
@ -596,12 +357,11 @@ class WSGIService(object):
|
||||
_launcher = None
|
||||
|
||||
|
||||
def serve(*servers):
|
||||
def serve(server, workers=None):
|
||||
global _launcher
|
||||
if not _launcher:
|
||||
_launcher = Launcher()
|
||||
for server in servers:
|
||||
_launcher.launch_server(server)
|
||||
if _launcher:
|
||||
raise RuntimeError(_('serve() can only be called once'))
|
||||
_launcher = service.launch(CONF, server, workers=workers)
|
||||
|
||||
|
||||
def wait():
|
||||
|
@ -38,10 +38,9 @@ class ManilaCmdAllTestCase(test.TestCase):
|
||||
self.mock_object(log, 'register_options')
|
||||
self.mock_object(log, 'getLogger')
|
||||
self.mock_object(utils, 'monkey_patch')
|
||||
self.mock_object(service, 'process_launcher')
|
||||
self.mock_object(service, 'WSGIService')
|
||||
self.mock_object(service.Service, 'create')
|
||||
self.mock_object(service, 'serve')
|
||||
self.mock_object(service, 'wait')
|
||||
self.wsgi_service = service.WSGIService.return_value
|
||||
self.service = service.Service.create.return_value
|
||||
self.fake_log = log.getLogger.return_value
|
||||
@ -53,34 +52,31 @@ class ManilaCmdAllTestCase(test.TestCase):
|
||||
log.register_options.assert_called_once_with(CONF)
|
||||
log.getLogger.assert_called_once_with('manila.all')
|
||||
utils.monkey_patch.assert_called_once_with()
|
||||
service.process_launcher.assert_called_once_with()
|
||||
service.WSGIService.assert_called_once_with('osapi_share')
|
||||
service.wait.assert_called_once_with()
|
||||
|
||||
def test_main(self):
|
||||
manila_all.main()
|
||||
|
||||
self._common_checks()
|
||||
self.assertFalse(self.fake_log.exception.called)
|
||||
service.serve.assert_has_calls([
|
||||
mock.call(self.wsgi_service, *[self.service] * 3)
|
||||
])
|
||||
self.assertTrue(
|
||||
service.process_launcher.return_value.launch_service.called)
|
||||
self.assertTrue(service.process_launcher.return_value.wait.called)
|
||||
|
||||
@ddt.data(Exception(), SystemExit())
|
||||
def test_main_wsgi_service_osapi_share_exception(self, exc):
|
||||
service.WSGIService.side_effect = exc
|
||||
|
||||
manila_all.main()
|
||||
|
||||
self._common_checks()
|
||||
self.fake_log.exception.assert_called_once_with(mock.ANY)
|
||||
service.serve.assert_has_calls([mock.call(*[self.service] * 3)])
|
||||
|
||||
@ddt.data(Exception(), SystemExit())
|
||||
def test_main_service_create_exception(self, exc):
|
||||
service.Service.create.side_effect = exc
|
||||
@ddt.data(
|
||||
*[(exc, exc_in_wsgi)
|
||||
for exc in (Exception(), SystemExit())
|
||||
for exc_in_wsgi in (True, False)]
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_main_raise_exception(self, exc, exc_in_wsgi):
|
||||
if exc_in_wsgi:
|
||||
service.WSGIService.side_effect = exc
|
||||
else:
|
||||
service.Service.create.side_effect = exc
|
||||
|
||||
manila_all.main()
|
||||
|
||||
self._common_checks()
|
||||
self.fake_log.exception.assert_has_calls([mock.ANY])
|
||||
service.serve.assert_has_calls([mock.call(self.wsgi_service)])
|
||||
|
@ -31,18 +31,18 @@ class ManilaCmdApiTestCase(test.TestCase):
|
||||
self.mock_object(manila_api.log, 'setup')
|
||||
self.mock_object(manila_api.log, 'register_options')
|
||||
self.mock_object(manila_api.utils, 'monkey_patch')
|
||||
self.mock_object(manila_api.service, 'process_launcher')
|
||||
self.mock_object(manila_api.service, 'WSGIService')
|
||||
self.mock_object(manila_api.service, 'serve')
|
||||
self.mock_object(manila_api.service, 'wait')
|
||||
|
||||
manila_api.main()
|
||||
|
||||
process_launcher = manila_api.service.process_launcher
|
||||
process_launcher.assert_called_once_with()
|
||||
self.assertTrue(process_launcher.return_value.launch_service.called)
|
||||
self.assertTrue(process_launcher.return_value.wait.called)
|
||||
self.assertEqual(CONF.project, 'manila')
|
||||
self.assertEqual(CONF.version, version.version_string())
|
||||
manila_api.log.setup.assert_called_once_with(CONF, "manila")
|
||||
manila_api.log.register_options.assert_called_once_with(CONF)
|
||||
manila_api.utils.monkey_patch.assert_called_once_with()
|
||||
manila_api.service.WSGIService.assert_called_once_with('osapi_share')
|
||||
manila_api.service.wait.assert_called_once_with()
|
||||
manila_api.service.serve.assert_called_once_with(
|
||||
manila_api.service.WSGIService.return_value)
|
||||
|
@ -32,10 +32,10 @@ class ManilaCmdShareTestCase(test.TestCase):
|
||||
self.mock_object(manila_share.log, 'setup')
|
||||
self.mock_object(manila_share.log, 'register_options')
|
||||
self.mock_object(manila_share.utils, 'monkey_patch')
|
||||
self.mock_object(manila_share.service, 'ProcessLauncher')
|
||||
self.mock_object(manila_share.service, 'process_launcher')
|
||||
self.mock_object(manila_share.service.Service, 'create')
|
||||
self.launcher = manila_share.service.ProcessLauncher.return_value
|
||||
self.mock_object(self.launcher, 'launch_server')
|
||||
self.launcher = manila_share.service.process_launcher.return_value
|
||||
self.mock_object(self.launcher, 'launch_service')
|
||||
self.mock_object(self.launcher, 'wait')
|
||||
self.server = manila_share.service.Service.create.return_value
|
||||
fake_host = 'fake_host'
|
||||
@ -48,7 +48,7 @@ class ManilaCmdShareTestCase(test.TestCase):
|
||||
manila_share.log.setup.assert_called_once_with(CONF, "manila")
|
||||
manila_share.log.register_options.assert_called_once_with(CONF)
|
||||
manila_share.utils.monkey_patch.assert_called_once_with()
|
||||
manila_share.service.ProcessLauncher.assert_called_once_with()
|
||||
manila_share.service.process_launcher.assert_called_once_with()
|
||||
self.launcher.wait.assert_called_once_with()
|
||||
|
||||
if backends:
|
||||
@ -58,9 +58,9 @@ class ManilaCmdShareTestCase(test.TestCase):
|
||||
service_name=backend,
|
||||
binary='manila-share') for backend in backends
|
||||
])
|
||||
self.launcher.launch_server.assert_has_calls([
|
||||
self.launcher.launch_service.assert_has_calls([
|
||||
mock.call(self.server) for backend in backends])
|
||||
else:
|
||||
manila_share.service.Service.create.assert_called_once_with(
|
||||
binary='manila-share')
|
||||
self.launcher.launch_server.assert_called_once_with(self.server)
|
||||
self.launcher.launch_service.assert_called_once_with(self.server)
|
||||
|
@ -184,24 +184,27 @@ class ServiceTestCase(test.TestCase):
|
||||
|
||||
class TestWSGIService(test.TestCase):
|
||||
|
||||
@mock.patch.object(wsgi.Loader, 'load_app', mock.Mock())
|
||||
def setUp(self):
|
||||
super(self.__class__, self).setUp()
|
||||
self.mock_object(wsgi.Loader, 'load_app')
|
||||
self.test_service = service.WSGIService("test_service")
|
||||
|
||||
def test_service_random_port(self):
|
||||
test_service = service.WSGIService("test_service")
|
||||
self.assertEqual(0, test_service.port)
|
||||
test_service.start()
|
||||
self.assertNotEqual(0, test_service.port)
|
||||
test_service.stop()
|
||||
self.assertEqual(0, self.test_service.port)
|
||||
self.test_service.start()
|
||||
self.assertNotEqual(0, self.test_service.port)
|
||||
self.test_service.stop()
|
||||
wsgi.Loader.load_app.assert_called_once_with("test_service")
|
||||
|
||||
def test_reset_pool_size_to_default(self):
|
||||
self.test_service.start()
|
||||
|
||||
class TestLauncher(test.TestCase):
|
||||
# Stopping the service, which in turn sets pool size to 0
|
||||
self.test_service.stop()
|
||||
self.assertEqual(0, self.test_service.server._pool.size)
|
||||
|
||||
@mock.patch.object(wsgi.Loader, 'load_app', mock.Mock())
|
||||
def test_launch_app(self):
|
||||
self.service = service.WSGIService("test_service")
|
||||
self.assertEqual(0, self.service.port)
|
||||
launcher = service.Launcher()
|
||||
launcher.launch_server(self.service)
|
||||
self.assertEqual(0, self.service.port)
|
||||
launcher.stop()
|
||||
# Resetting pool size to default
|
||||
self.test_service.reset()
|
||||
self.test_service.start()
|
||||
self.assertEqual(1000, self.test_service.server._pool.size)
|
||||
wsgi.Loader.load_app.assert_called_once_with("test_service")
|
||||
|
@ -21,6 +21,8 @@ import ssl
|
||||
import tempfile
|
||||
import urllib2
|
||||
|
||||
import eventlet
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
import testtools
|
||||
@ -87,12 +89,11 @@ class TestWSGIServer(test.TestCase):
|
||||
"""WSGI server tests."""
|
||||
|
||||
def test_no_app(self):
|
||||
server = manila.wsgi.Server("test_app", None)
|
||||
server = manila.wsgi.Server("test_app", None, host="127.0.0.1", port=0)
|
||||
self.assertEqual("test_app", server.name)
|
||||
|
||||
def test_start_random_port(self):
|
||||
server = manila.wsgi.Server("test_random_port", None, host="127.0.0.1")
|
||||
self.assertEqual(0, server.port)
|
||||
server.start()
|
||||
self.assertNotEqual(0, server.port)
|
||||
server.stop()
|
||||
@ -113,6 +114,8 @@ class TestWSGIServer(test.TestCase):
|
||||
server.wait()
|
||||
|
||||
def test_app(self):
|
||||
self.mock_object(
|
||||
eventlet, 'spawn', mock.Mock(side_effect=eventlet.spawn))
|
||||
greetings = 'Hello, World!!!'
|
||||
|
||||
def hello_world(env, start_response):
|
||||
@ -123,12 +126,23 @@ class TestWSGIServer(test.TestCase):
|
||||
start_response('200 OK', [('Content-Type', 'text/plain')])
|
||||
return [greetings]
|
||||
|
||||
server = manila.wsgi.Server("test_app", hello_world)
|
||||
server = manila.wsgi.Server(
|
||||
"test_app", hello_world, host="127.0.0.1", port=0)
|
||||
server.start()
|
||||
|
||||
response = urllib2.urlopen('http://127.0.0.1:%d/' % server.port)
|
||||
self.assertEqual(greetings, response.read())
|
||||
|
||||
# Verify provided parameters to eventlet.spawn func
|
||||
eventlet.spawn.assert_called_once_with(
|
||||
func=eventlet.wsgi.server,
|
||||
sock=mock.ANY,
|
||||
site=server.app,
|
||||
protocol=server._protocol,
|
||||
custom_pool=server._pool,
|
||||
log=server._logger,
|
||||
)
|
||||
|
||||
server.stop()
|
||||
|
||||
def test_app_using_ssl(self):
|
||||
@ -143,7 +157,8 @@ class TestWSGIServer(test.TestCase):
|
||||
def hello_world(req):
|
||||
return greetings
|
||||
|
||||
server = manila.wsgi.Server("test_app", hello_world)
|
||||
server = manila.wsgi.Server(
|
||||
"test_app", hello_world, host="127.0.0.1", port=0)
|
||||
server.start()
|
||||
|
||||
if hasattr(ssl, '_create_unverified_context'):
|
||||
@ -190,6 +205,19 @@ class TestWSGIServer(test.TestCase):
|
||||
|
||||
server.stop()
|
||||
|
||||
def test_reset_pool_size_to_default(self):
|
||||
server = manila.wsgi.Server("test_resize", None, host="127.0.0.1")
|
||||
server.start()
|
||||
|
||||
# Stopping the server, which in turn sets pool size to 0
|
||||
server.stop()
|
||||
self.assertEqual(0, server._pool.size)
|
||||
|
||||
# Resetting pool size to default
|
||||
server.reset()
|
||||
server.start()
|
||||
self.assertEqual(1000, server._pool.size)
|
||||
|
||||
|
||||
class ExceptionTest(test.TestCase):
|
||||
|
||||
|
148
manila/wsgi.py
148
manila/wsgi.py
@ -31,7 +31,8 @@ import eventlet.wsgi
|
||||
import greenlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import netutils
|
||||
from oslo_service import service
|
||||
from oslo_utils import excutils
|
||||
from paste import deploy
|
||||
import routes.middleware
|
||||
import six
|
||||
@ -40,6 +41,7 @@ import webob.exc
|
||||
|
||||
from manila import exception
|
||||
from manila.i18n import _
|
||||
from manila.i18n import _LE
|
||||
from manila.i18n import _LI
|
||||
|
||||
socket_opts = [
|
||||
@ -91,13 +93,13 @@ CONF.register_opts(eventlet_opts)
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class Server(object):
|
||||
class Server(service.ServiceBase):
|
||||
"""Server class to manage a WSGI server, serving a WSGI application."""
|
||||
|
||||
default_pool_size = 1000
|
||||
|
||||
def __init__(self, name, app, host=None, port=None, pool_size=None,
|
||||
protocol=eventlet.wsgi.HttpProtocol):
|
||||
protocol=eventlet.wsgi.HttpProtocol, backlog=128):
|
||||
"""Initialize, but do not start, a WSGI server.
|
||||
|
||||
:param name: Pretty name for logging.
|
||||
@ -116,10 +118,14 @@ class Server(object):
|
||||
self._server = None
|
||||
self._socket = None
|
||||
self._protocol = protocol
|
||||
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
|
||||
self.pool_size = pool_size or self.default_pool_size
|
||||
self._pool = eventlet.GreenPool(self.pool_size)
|
||||
self._logger = log.getLogger("eventlet.wsgi.server")
|
||||
|
||||
def _get_socket(self, host, port, backlog):
|
||||
if backlog < 1:
|
||||
raise exception.InvalidInput(
|
||||
reason='The backlog must be more than 1')
|
||||
|
||||
bind_addr = (host, port)
|
||||
# TODO(dims): eventlet's green dns/socket module does not actually
|
||||
# support IPv6 in getaddrinfo(). We need to get around this in the
|
||||
@ -137,7 +143,7 @@ class Server(object):
|
||||
cert_file = CONF.ssl_cert_file
|
||||
key_file = CONF.ssl_key_file
|
||||
ca_file = CONF.ssl_ca_file
|
||||
use_ssl = cert_file or key_file
|
||||
self._use_ssl = cert_file or key_file
|
||||
|
||||
if cert_file and not os.path.exists(cert_file):
|
||||
raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
|
||||
@ -148,85 +154,85 @@ class Server(object):
|
||||
if key_file and not os.path.exists(key_file):
|
||||
raise RuntimeError(_("Unable to find key_file : %s") % key_file)
|
||||
|
||||
if use_ssl and (not cert_file or not key_file):
|
||||
if self._use_ssl and (not cert_file or not key_file):
|
||||
raise RuntimeError(_("When running server in SSL mode, you must "
|
||||
"specify both a cert_file and key_file "
|
||||
"option value in your configuration file"))
|
||||
|
||||
def wrap_ssl(sock):
|
||||
ssl_kwargs = {
|
||||
'server_side': True,
|
||||
'certfile': cert_file,
|
||||
'keyfile': key_file,
|
||||
'cert_reqs': ssl.CERT_NONE,
|
||||
}
|
||||
|
||||
if CONF.ssl_ca_file:
|
||||
ssl_kwargs['ca_certs'] = ca_file
|
||||
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
|
||||
|
||||
return ssl.wrap_socket(sock, **ssl_kwargs)
|
||||
|
||||
sock = None
|
||||
retry_until = time.time() + 30
|
||||
while not sock and time.time() < retry_until:
|
||||
while not self._socket and time.time() < retry_until:
|
||||
try:
|
||||
sock = eventlet.listen(bind_addr,
|
||||
backlog=backlog,
|
||||
family=family)
|
||||
if use_ssl:
|
||||
sock = wrap_ssl(sock)
|
||||
|
||||
self._socket = eventlet.listen(
|
||||
bind_addr, backlog=backlog, family=family)
|
||||
except socket.error as err:
|
||||
if err.args[0] != errno.EADDRINUSE:
|
||||
raise
|
||||
eventlet.sleep(0.1)
|
||||
if not sock:
|
||||
|
||||
if not self._socket:
|
||||
raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
|
||||
"after trying for 30 seconds") %
|
||||
{'host': host, 'port': port})
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
# sockets can hang around forever without keepalive
|
||||
netutils.set_tcp_keepalive(sock,
|
||||
CONF.tcp_keepalive,
|
||||
CONF.tcp_keepidle,
|
||||
CONF.tcp_keepalive_interval,
|
||||
CONF.tcp_keepalive_count)
|
||||
(self._host, self._port) = self._socket.getsockname()[0:2]
|
||||
LOG.info(_LI("%(name)s listening on %(_host)s:%(_port)s"),
|
||||
{'name': self.name, '_host': self._host, '_port': self._port})
|
||||
|
||||
return sock
|
||||
|
||||
def _start(self):
|
||||
"""Run the blocking eventlet WSGI server.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
eventlet.wsgi.server(self._socket,
|
||||
self.app,
|
||||
protocol=self._protocol,
|
||||
custom_pool=self._pool,
|
||||
log=self._logger)
|
||||
|
||||
def start(self, backlog=128):
|
||||
def start(self):
|
||||
"""Start serving a WSGI application.
|
||||
|
||||
:param backlog: Maximum number of queued connections.
|
||||
:returns: None
|
||||
:raises: manila.exception.InvalidInput
|
||||
|
||||
"""
|
||||
if backlog < 1:
|
||||
raise exception.InvalidInput(
|
||||
reason='The backlog must be more than 1')
|
||||
# The server socket object will be closed after server exits,
|
||||
# but the underlying file descriptor will remain open, and will
|
||||
# give bad file descriptor error. So duplicating the socket object,
|
||||
# to keep file descriptor usable.
|
||||
|
||||
self._socket = self._get_socket(self._host,
|
||||
self._port,
|
||||
backlog=backlog)
|
||||
self._server = eventlet.spawn(self._start)
|
||||
(self._host, self._port) = self._socket.getsockname()[0:2]
|
||||
LOG.info(_LI("Started %(name)s on %(_host)s:%(_port)s"),
|
||||
{'name': self.name, '_host': self._host, '_port': self._port})
|
||||
dup_socket = self._socket.dup()
|
||||
if self._use_ssl:
|
||||
try:
|
||||
ssl_kwargs = {
|
||||
'server_side': True,
|
||||
'certfile': CONF.ssl_cert_file,
|
||||
'keyfile': CONF.ssl_key_file,
|
||||
'cert_reqs': ssl.CERT_NONE,
|
||||
}
|
||||
|
||||
if CONF.ssl_ca_file:
|
||||
ssl_kwargs['ca_certs'] = CONF.ssl_ca_file
|
||||
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
|
||||
|
||||
dup_socket = ssl.wrap_socket(dup_socket,
|
||||
**ssl_kwargs)
|
||||
|
||||
dup_socket.setsockopt(socket.SOL_SOCKET,
|
||||
socket.SO_REUSEADDR, 1)
|
||||
|
||||
# sockets can hang around forever without keepalive
|
||||
dup_socket.setsockopt(socket.SOL_SOCKET,
|
||||
socket.SO_KEEPALIVE, 1)
|
||||
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(
|
||||
_LE("Failed to start %(name)s on %(_host)s:%(_port)s "
|
||||
"with SSL support."),
|
||||
{"name": self.name, "_host": self._host,
|
||||
"_port": self._port}
|
||||
)
|
||||
|
||||
wsgi_kwargs = {
|
||||
'func': eventlet.wsgi.server,
|
||||
'sock': dup_socket,
|
||||
'site': self.app,
|
||||
'protocol': self._protocol,
|
||||
'custom_pool': self._pool,
|
||||
'log': self._logger,
|
||||
}
|
||||
|
||||
self._server = eventlet.spawn(**wsgi_kwargs)
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
@ -246,7 +252,10 @@ class Server(object):
|
||||
|
||||
"""
|
||||
LOG.info(_LI("Stopping WSGI server."))
|
||||
self._server.kill()
|
||||
if self._server is not None:
|
||||
# Resize pool to stop new requests from being processed
|
||||
self._pool.resize(0)
|
||||
self._server.kill()
|
||||
|
||||
def wait(self):
|
||||
"""Block, until the server has stopped.
|
||||
@ -257,10 +266,19 @@ class Server(object):
|
||||
|
||||
"""
|
||||
try:
|
||||
self._server.wait()
|
||||
if self._server is not None:
|
||||
self._pool.waitall()
|
||||
self._server.wait()
|
||||
except greenlet.GreenletExit:
|
||||
LOG.info(_LI("WSGI server has stopped."))
|
||||
|
||||
def reset(self):
|
||||
"""Reset server greenpool size to default.
|
||||
|
||||
:returns: None
|
||||
"""
|
||||
self._pool.resize(self.pool_size)
|
||||
|
||||
|
||||
class Request(webob.Request):
|
||||
pass
|
||||
|
Loading…
Reference in New Issue
Block a user