Enable multi-process for API service

Due to the limit of Python interpreter, API service of Manila can't
really utilize underlying multi-core architecture even libraries
like eventlet has been used. To make API service much more scalable,
we'd adopt multi-process (worker) mode that has been used for long
in Glance/Swift/Nova/Cinder.

The default behavior isn't changed with this patch, Manila API
service will still run in one process (default value of
osapi_volume_workers is None).

Implementation wise, a good portion of manila/service.py has been
removed because those content has been merged in Oslo version of
service module.  manila/wsgi.py is also updated to adopt the change
for multiple WSGI servers running in separate processes.

DocImpact: 'New config option osapi_share_workers is used to specify
number of API service workers (OS processes) to launch for Manila
API service.  Setting this config option to a proper value (e.g.
osapi_share_workers = # of CPU cores/threads of the machine) can
greatly improve the total throughput of API service [# of API
requests can be handled per second].'

Also removed out-dated comments in manila/cmd/api.py due to the fact
that this bug [1] has been fixed in eventlet 0.9.13

This commit is port from two Cinder changes - [2] and [3].

[1] https://bitbucket.org/eventlet/eventlet/issue/92/eventletgreen-override-of-oswaitpid
[2] I8361d0dc0d43040e48634ff1aee1324e5e0af466
[3] Iab32a3fe230a11692a8cad274304214247d6c2c6
Implement bp: multi-process-api-service

Change-Id: I1cb98c938fd4e1dabe75c78a7ef392d6d7387dab
This commit is contained in:
Valeriy Ponomaryov 2015-07-20 18:35:02 +03:00
parent 0de9d3f993
commit aea3edc229
10 changed files with 204 additions and 401 deletions

View File

@ -54,20 +54,20 @@ def main():
LOG = log.getLogger('manila.all')
utils.monkey_patch()
servers = []
launcher = service.process_launcher()
# manila-api
try:
servers.append(service.WSGIService('osapi_share'))
server = service.WSGIService('osapi_share')
launcher.launch_service(server, workers=server.workers or 1)
except (Exception, SystemExit):
LOG.exception(_LE('Failed to load osapi_share'))
for binary in ['manila-share', 'manila-scheduler', 'manila-api']:
try:
servers.append(service.Service.create(binary=binary))
launcher.launch_service(service.Service.create(binary=binary))
except (Exception, SystemExit):
LOG.exception(_LE('Failed to load %s'), binary)
service.serve(*servers)
service.wait()
launcher.wait()
if __name__ == '__main__':

View File

@ -18,10 +18,6 @@
"""Starter script for manila OS API."""
# NOTE(jdg): If we port over multi worker code from Nova
# we'll need to set monkey_patch(os=False), unless
# eventlet is updated/released to fix the root issue
import eventlet
eventlet.monkey_patch()
@ -48,9 +44,11 @@ def main():
config.verify_share_protocols()
log.setup(CONF, "manila")
utils.monkey_patch()
launcher = service.process_launcher()
server = service.WSGIService('osapi_share')
service.serve(server)
service.wait()
launcher.launch_service(server, workers=server.workers or 1)
launcher.wait()
if __name__ == '__main__':

View File

@ -42,17 +42,17 @@ def main():
version=version.version_string())
log.setup(CONF, "manila")
utils.monkey_patch()
launcher = service.ProcessLauncher()
launcher = service.process_launcher()
if CONF.enabled_share_backends:
for backend in CONF.enabled_share_backends:
host = "%s@%s" % (CONF.host, backend)
server = service.Service.create(host=host,
service_name=backend,
binary='manila-share')
launcher.launch_server(server)
launcher.launch_service(server)
else:
server = service.Service.create(binary='manila-share')
launcher.launch_server(server)
launcher.launch_service(server)
launcher.wait()

View File

@ -17,20 +17,15 @@
"""Generic Node base class for all workers that run on hosts."""
import errno
import inspect
import os
import random
import signal
import sys
import time
import eventlet
import greenlet
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
from oslo_service import loopingcall
from oslo_service import service
from oslo_utils import importutils
from manila import context
@ -62,271 +57,17 @@ service_opts = [
help='IP address for OpenStack Share API to listen on.'),
cfg.IntOpt('osapi_share_listen_port',
default=8786,
help='Port for OpenStack Share API to listen on.'), ]
help='Port for OpenStack Share API to listen on.'),
cfg.IntOpt('osapi_share_workers',
default=1,
help='Number of workers for OpenStack Share API service.'),
]
CONF = cfg.CONF
CONF.register_opts(service_opts)
class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
super(SignalExit, self).__init__(exccode)
self.signo = signo
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
def __init__(self):
"""Initialize the service launcher.
:returns: None
"""
self._services = []
@staticmethod
def run_server(server):
"""Start and wait for a server to finish.
:param service: Server to run and wait for.
:returns: None
"""
server.start()
server.wait()
def launch_server(self, server):
"""Load and start the given server.
:param server: The server you would like to start.
:returns: None
"""
gt = eventlet.spawn(self.run_server, server)
self._services.append(gt)
def stop(self):
"""Stop all services which are currently running.
:returns: None
"""
for service in self._services:
service.kill()
def wait(self):
"""Waits until all services have been stopped, and then returns.
:returns: None
"""
def sigterm(sig, frame):
LOG.info(_LI("SIGTERM received"))
# NOTE(jk0): Raise a ^C which is caught by the caller and cleanly
# shuts down the service. This does not yet handle eventlet
# threads.
raise KeyboardInterrupt
signal.signal(signal.SIGTERM, sigterm)
for service in self._services:
try:
service.wait()
except greenlet.GreenletExit:
pass
class ServerWrapper(object):
def __init__(self, server, workers):
self.server = server
self.workers = workers
self.children = set()
self.forktimes = []
self.failed = False
class ProcessLauncher(object):
def __init__(self):
self.children = {}
self.sigcaught = None
self.totalwrap = 0
self.failedwrap = 0
self.running = True
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read()
LOG.info(_LI('Parent process has died unexpectedly, exiting'))
sys.exit(1)
def _child_process(self, server):
# Setup child signal handlers differently
def _sigterm(*args):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SignalExit(signal.SIGTERM)
signal.signal(signal.SIGTERM, _sigterm)
# Block SIGINT and let the parent send us a SIGTERM
# signal.signal(signal.SIGINT, signal.SIG_IGN)
# This differs from the behavior in nova in that we dont ignore this
# It allows the non-wsgi services to be terminated properly
signal.signal(signal.SIGINT, _sigterm)
# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()
# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn(self._pipe_watcher)
# Reseed random number generator
random.seed()
launcher = Launcher()
launcher.run_server(server)
def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info(_LI('Forking too fast, sleeping'))
time.sleep(1)
wrap.forktimes.pop(0)
wrap.forktimes.append(time.time())
pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
status = 0
try:
self._child_process(wrap.server)
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[exc.signo]
LOG.info(_LI('Caught %s, exiting'), signame)
status = exc.code
except SystemExit as exc:
status = exc.code
except BaseException:
LOG.exception(_LE('Unhandled exception'))
status = 2
finally:
wrap.server.stop()
os._exit(status)
LOG.info(_LI('Started child %d'), pid)
wrap.children.add(pid)
self.children[pid] = wrap
return pid
def launch_server(self, server, workers=1):
wrap = ServerWrapper(server, workers)
self.totalwrap = self.totalwrap + 1
LOG.info(_LI('Starting %d workers'), wrap.workers)
while (self.running and len(wrap.children) < wrap.workers
and not wrap.failed):
self._start_child(wrap)
def _wait_child(self):
try:
# Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
if not pid:
return None
except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD):
raise
return None
code = 0
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)
LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'),
{'pid': pid, 'sig': sig})
else:
code = os.WEXITSTATUS(status)
LOG.info(_LI('Child %(pid)d exited with status %(code)d'),
{'pid': pid, 'code': code})
if pid not in self.children:
LOG.warning(_LW('pid %d not in child list'), pid)
return None
wrap = self.children.pop(pid)
wrap.children.remove(pid)
if 2 == code:
wrap.failed = True
self.failedwrap = self.failedwrap + 1
LOG.info(_LI('_wait_child %d'), self.failedwrap)
if self.failedwrap == self.totalwrap:
self.running = False
return wrap
def wait(self):
"""Loop waiting on children to die and respawning as necessary."""
while self.running:
wrap = self._wait_child()
if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue
LOG.info(_LI('wait wrap.failed %s'), wrap.failed)
while (self.running and len(wrap.children) < wrap.workers
and not wrap.failed):
self._start_child(wrap)
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}[self.sigcaught]
LOG.info(_LI('Caught %s, stopping children'), signame)
for pid in self.children:
try:
os.kill(pid, signal.SIGTERM)
except OSError as exc:
if exc.errno != errno.ESRCH:
raise
# Wait for children to die
if self.children:
LOG.info(_LI('Waiting on %d children to exit'), len(self.children))
while self.children:
self._wait_child()
class Service(object):
class Service(service.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
@ -469,6 +210,8 @@ class Service(object):
pass
self.timers = []
super(Service, self).stop()
def wait(self):
for x in self.timers:
try:
@ -514,7 +257,7 @@ class Service(object):
LOG.exception(_LE('model server went away'))
class WSGIService(object):
class WSGIService(service.ServiceBase):
"""Provides ability to launch API from a 'paste' configuration."""
def __init__(self, name, loader=None):
@ -533,6 +276,13 @@ class WSGIService(object):
self.app = self.loader.load_app(name)
self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0")
self.port = getattr(CONF, '%s_listen_port' % name, 0)
self.workers = getattr(CONF, '%s_workers' % name, None)
if self.workers < 1:
LOG.warn(
_LW("Value of config option %(name)s_workers must be integer "
"greater than 1. Input value ignored.") % {'name': name})
# Reset workers to default
self.workers = None
self.server = wsgi.Server(name,
self.app,
host=self.host,
@ -589,6 +339,17 @@ class WSGIService(object):
"""
self.server.wait()
def reset(self):
"""Reset server greenpool size to default.
:returns: None
"""
self.server.reset()
def process_launcher():
return service.ProcessLauncher(CONF)
# NOTE(vish): the global launcher is to maintain the existing
# functionality of calling service.serve +
@ -596,12 +357,11 @@ class WSGIService(object):
_launcher = None
def serve(*servers):
def serve(server, workers=None):
global _launcher
if not _launcher:
_launcher = Launcher()
for server in servers:
_launcher.launch_server(server)
if _launcher:
raise RuntimeError(_('serve() can only be called once'))
_launcher = service.launch(CONF, server, workers=workers)
def wait():

View File

@ -38,10 +38,9 @@ class ManilaCmdAllTestCase(test.TestCase):
self.mock_object(log, 'register_options')
self.mock_object(log, 'getLogger')
self.mock_object(utils, 'monkey_patch')
self.mock_object(service, 'process_launcher')
self.mock_object(service, 'WSGIService')
self.mock_object(service.Service, 'create')
self.mock_object(service, 'serve')
self.mock_object(service, 'wait')
self.wsgi_service = service.WSGIService.return_value
self.service = service.Service.create.return_value
self.fake_log = log.getLogger.return_value
@ -53,34 +52,31 @@ class ManilaCmdAllTestCase(test.TestCase):
log.register_options.assert_called_once_with(CONF)
log.getLogger.assert_called_once_with('manila.all')
utils.monkey_patch.assert_called_once_with()
service.process_launcher.assert_called_once_with()
service.WSGIService.assert_called_once_with('osapi_share')
service.wait.assert_called_once_with()
def test_main(self):
manila_all.main()
self._common_checks()
self.assertFalse(self.fake_log.exception.called)
service.serve.assert_has_calls([
mock.call(self.wsgi_service, *[self.service] * 3)
])
self.assertTrue(
service.process_launcher.return_value.launch_service.called)
self.assertTrue(service.process_launcher.return_value.wait.called)
@ddt.data(Exception(), SystemExit())
def test_main_wsgi_service_osapi_share_exception(self, exc):
service.WSGIService.side_effect = exc
manila_all.main()
self._common_checks()
self.fake_log.exception.assert_called_once_with(mock.ANY)
service.serve.assert_has_calls([mock.call(*[self.service] * 3)])
@ddt.data(Exception(), SystemExit())
def test_main_service_create_exception(self, exc):
service.Service.create.side_effect = exc
@ddt.data(
*[(exc, exc_in_wsgi)
for exc in (Exception(), SystemExit())
for exc_in_wsgi in (True, False)]
)
@ddt.unpack
def test_main_raise_exception(self, exc, exc_in_wsgi):
if exc_in_wsgi:
service.WSGIService.side_effect = exc
else:
service.Service.create.side_effect = exc
manila_all.main()
self._common_checks()
self.fake_log.exception.assert_has_calls([mock.ANY])
service.serve.assert_has_calls([mock.call(self.wsgi_service)])

View File

@ -31,18 +31,18 @@ class ManilaCmdApiTestCase(test.TestCase):
self.mock_object(manila_api.log, 'setup')
self.mock_object(manila_api.log, 'register_options')
self.mock_object(manila_api.utils, 'monkey_patch')
self.mock_object(manila_api.service, 'process_launcher')
self.mock_object(manila_api.service, 'WSGIService')
self.mock_object(manila_api.service, 'serve')
self.mock_object(manila_api.service, 'wait')
manila_api.main()
process_launcher = manila_api.service.process_launcher
process_launcher.assert_called_once_with()
self.assertTrue(process_launcher.return_value.launch_service.called)
self.assertTrue(process_launcher.return_value.wait.called)
self.assertEqual(CONF.project, 'manila')
self.assertEqual(CONF.version, version.version_string())
manila_api.log.setup.assert_called_once_with(CONF, "manila")
manila_api.log.register_options.assert_called_once_with(CONF)
manila_api.utils.monkey_patch.assert_called_once_with()
manila_api.service.WSGIService.assert_called_once_with('osapi_share')
manila_api.service.wait.assert_called_once_with()
manila_api.service.serve.assert_called_once_with(
manila_api.service.WSGIService.return_value)

View File

@ -32,10 +32,10 @@ class ManilaCmdShareTestCase(test.TestCase):
self.mock_object(manila_share.log, 'setup')
self.mock_object(manila_share.log, 'register_options')
self.mock_object(manila_share.utils, 'monkey_patch')
self.mock_object(manila_share.service, 'ProcessLauncher')
self.mock_object(manila_share.service, 'process_launcher')
self.mock_object(manila_share.service.Service, 'create')
self.launcher = manila_share.service.ProcessLauncher.return_value
self.mock_object(self.launcher, 'launch_server')
self.launcher = manila_share.service.process_launcher.return_value
self.mock_object(self.launcher, 'launch_service')
self.mock_object(self.launcher, 'wait')
self.server = manila_share.service.Service.create.return_value
fake_host = 'fake_host'
@ -48,7 +48,7 @@ class ManilaCmdShareTestCase(test.TestCase):
manila_share.log.setup.assert_called_once_with(CONF, "manila")
manila_share.log.register_options.assert_called_once_with(CONF)
manila_share.utils.monkey_patch.assert_called_once_with()
manila_share.service.ProcessLauncher.assert_called_once_with()
manila_share.service.process_launcher.assert_called_once_with()
self.launcher.wait.assert_called_once_with()
if backends:
@ -58,9 +58,9 @@ class ManilaCmdShareTestCase(test.TestCase):
service_name=backend,
binary='manila-share') for backend in backends
])
self.launcher.launch_server.assert_has_calls([
self.launcher.launch_service.assert_has_calls([
mock.call(self.server) for backend in backends])
else:
manila_share.service.Service.create.assert_called_once_with(
binary='manila-share')
self.launcher.launch_server.assert_called_once_with(self.server)
self.launcher.launch_service.assert_called_once_with(self.server)

View File

@ -184,24 +184,27 @@ class ServiceTestCase(test.TestCase):
class TestWSGIService(test.TestCase):
@mock.patch.object(wsgi.Loader, 'load_app', mock.Mock())
def setUp(self):
super(self.__class__, self).setUp()
self.mock_object(wsgi.Loader, 'load_app')
self.test_service = service.WSGIService("test_service")
def test_service_random_port(self):
test_service = service.WSGIService("test_service")
self.assertEqual(0, test_service.port)
test_service.start()
self.assertNotEqual(0, test_service.port)
test_service.stop()
self.assertEqual(0, self.test_service.port)
self.test_service.start()
self.assertNotEqual(0, self.test_service.port)
self.test_service.stop()
wsgi.Loader.load_app.assert_called_once_with("test_service")
def test_reset_pool_size_to_default(self):
self.test_service.start()
class TestLauncher(test.TestCase):
# Stopping the service, which in turn sets pool size to 0
self.test_service.stop()
self.assertEqual(0, self.test_service.server._pool.size)
@mock.patch.object(wsgi.Loader, 'load_app', mock.Mock())
def test_launch_app(self):
self.service = service.WSGIService("test_service")
self.assertEqual(0, self.service.port)
launcher = service.Launcher()
launcher.launch_server(self.service)
self.assertEqual(0, self.service.port)
launcher.stop()
# Resetting pool size to default
self.test_service.reset()
self.test_service.start()
self.assertEqual(1000, self.test_service.server._pool.size)
wsgi.Loader.load_app.assert_called_once_with("test_service")

View File

@ -21,6 +21,8 @@ import ssl
import tempfile
import urllib2
import eventlet
import mock
from oslo_config import cfg
import six
import testtools
@ -87,12 +89,11 @@ class TestWSGIServer(test.TestCase):
"""WSGI server tests."""
def test_no_app(self):
server = manila.wsgi.Server("test_app", None)
server = manila.wsgi.Server("test_app", None, host="127.0.0.1", port=0)
self.assertEqual("test_app", server.name)
def test_start_random_port(self):
server = manila.wsgi.Server("test_random_port", None, host="127.0.0.1")
self.assertEqual(0, server.port)
server.start()
self.assertNotEqual(0, server.port)
server.stop()
@ -113,6 +114,8 @@ class TestWSGIServer(test.TestCase):
server.wait()
def test_app(self):
self.mock_object(
eventlet, 'spawn', mock.Mock(side_effect=eventlet.spawn))
greetings = 'Hello, World!!!'
def hello_world(env, start_response):
@ -123,12 +126,23 @@ class TestWSGIServer(test.TestCase):
start_response('200 OK', [('Content-Type', 'text/plain')])
return [greetings]
server = manila.wsgi.Server("test_app", hello_world)
server = manila.wsgi.Server(
"test_app", hello_world, host="127.0.0.1", port=0)
server.start()
response = urllib2.urlopen('http://127.0.0.1:%d/' % server.port)
self.assertEqual(greetings, response.read())
# Verify provided parameters to eventlet.spawn func
eventlet.spawn.assert_called_once_with(
func=eventlet.wsgi.server,
sock=mock.ANY,
site=server.app,
protocol=server._protocol,
custom_pool=server._pool,
log=server._logger,
)
server.stop()
def test_app_using_ssl(self):
@ -143,7 +157,8 @@ class TestWSGIServer(test.TestCase):
def hello_world(req):
return greetings
server = manila.wsgi.Server("test_app", hello_world)
server = manila.wsgi.Server(
"test_app", hello_world, host="127.0.0.1", port=0)
server.start()
if hasattr(ssl, '_create_unverified_context'):
@ -190,6 +205,19 @@ class TestWSGIServer(test.TestCase):
server.stop()
def test_reset_pool_size_to_default(self):
server = manila.wsgi.Server("test_resize", None, host="127.0.0.1")
server.start()
# Stopping the server, which in turn sets pool size to 0
server.stop()
self.assertEqual(0, server._pool.size)
# Resetting pool size to default
server.reset()
server.start()
self.assertEqual(1000, server._pool.size)
class ExceptionTest(test.TestCase):

View File

@ -31,7 +31,8 @@ import eventlet.wsgi
import greenlet
from oslo_config import cfg
from oslo_log import log
from oslo_utils import netutils
from oslo_service import service
from oslo_utils import excutils
from paste import deploy
import routes.middleware
import six
@ -40,6 +41,7 @@ import webob.exc
from manila import exception
from manila.i18n import _
from manila.i18n import _LE
from manila.i18n import _LI
socket_opts = [
@ -91,13 +93,13 @@ CONF.register_opts(eventlet_opts)
LOG = log.getLogger(__name__)
class Server(object):
class Server(service.ServiceBase):
"""Server class to manage a WSGI server, serving a WSGI application."""
default_pool_size = 1000
def __init__(self, name, app, host=None, port=None, pool_size=None,
protocol=eventlet.wsgi.HttpProtocol):
protocol=eventlet.wsgi.HttpProtocol, backlog=128):
"""Initialize, but do not start, a WSGI server.
:param name: Pretty name for logging.
@ -116,10 +118,14 @@ class Server(object):
self._server = None
self._socket = None
self._protocol = protocol
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
self.pool_size = pool_size or self.default_pool_size
self._pool = eventlet.GreenPool(self.pool_size)
self._logger = log.getLogger("eventlet.wsgi.server")
def _get_socket(self, host, port, backlog):
if backlog < 1:
raise exception.InvalidInput(
reason='The backlog must be more than 1')
bind_addr = (host, port)
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
@ -137,7 +143,7 @@ class Server(object):
cert_file = CONF.ssl_cert_file
key_file = CONF.ssl_key_file
ca_file = CONF.ssl_ca_file
use_ssl = cert_file or key_file
self._use_ssl = cert_file or key_file
if cert_file and not os.path.exists(cert_file):
raise RuntimeError(_("Unable to find cert_file : %s") % cert_file)
@ -148,85 +154,85 @@ class Server(object):
if key_file and not os.path.exists(key_file):
raise RuntimeError(_("Unable to find key_file : %s") % key_file)
if use_ssl and (not cert_file or not key_file):
if self._use_ssl and (not cert_file or not key_file):
raise RuntimeError(_("When running server in SSL mode, you must "
"specify both a cert_file and key_file "
"option value in your configuration file"))
def wrap_ssl(sock):
ssl_kwargs = {
'server_side': True,
'certfile': cert_file,
'keyfile': key_file,
'cert_reqs': ssl.CERT_NONE,
}
if CONF.ssl_ca_file:
ssl_kwargs['ca_certs'] = ca_file
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
return ssl.wrap_socket(sock, **ssl_kwargs)
sock = None
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
while not self._socket and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
if use_ssl:
sock = wrap_ssl(sock)
self._socket = eventlet.listen(
bind_addr, backlog=backlog, family=family)
except socket.error as err:
if err.args[0] != errno.EADDRINUSE:
raise
eventlet.sleep(0.1)
if not sock:
if not self._socket:
raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
"after trying for 30 seconds") %
{'host': host, 'port': port})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
netutils.set_tcp_keepalive(sock,
CONF.tcp_keepalive,
CONF.tcp_keepidle,
CONF.tcp_keepalive_interval,
CONF.tcp_keepalive_count)
(self._host, self._port) = self._socket.getsockname()[0:2]
LOG.info(_LI("%(name)s listening on %(_host)s:%(_port)s"),
{'name': self.name, '_host': self._host, '_port': self._port})
return sock
def _start(self):
"""Run the blocking eventlet WSGI server.
:returns: None
"""
eventlet.wsgi.server(self._socket,
self.app,
protocol=self._protocol,
custom_pool=self._pool,
log=self._logger)
def start(self, backlog=128):
def start(self):
"""Start serving a WSGI application.
:param backlog: Maximum number of queued connections.
:returns: None
:raises: manila.exception.InvalidInput
"""
if backlog < 1:
raise exception.InvalidInput(
reason='The backlog must be more than 1')
# The server socket object will be closed after server exits,
# but the underlying file descriptor will remain open, and will
# give bad file descriptor error. So duplicating the socket object,
# to keep file descriptor usable.
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._server = eventlet.spawn(self._start)
(self._host, self._port) = self._socket.getsockname()[0:2]
LOG.info(_LI("Started %(name)s on %(_host)s:%(_port)s"),
{'name': self.name, '_host': self._host, '_port': self._port})
dup_socket = self._socket.dup()
if self._use_ssl:
try:
ssl_kwargs = {
'server_side': True,
'certfile': CONF.ssl_cert_file,
'keyfile': CONF.ssl_key_file,
'cert_reqs': ssl.CERT_NONE,
}
if CONF.ssl_ca_file:
ssl_kwargs['ca_certs'] = CONF.ssl_ca_file
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
dup_socket = ssl.wrap_socket(dup_socket,
**ssl_kwargs)
dup_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
dup_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)
except Exception:
with excutils.save_and_reraise_exception():
LOG.error(
_LE("Failed to start %(name)s on %(_host)s:%(_port)s "
"with SSL support."),
{"name": self.name, "_host": self._host,
"_port": self._port}
)
wsgi_kwargs = {
'func': eventlet.wsgi.server,
'sock': dup_socket,
'site': self.app,
'protocol': self._protocol,
'custom_pool': self._pool,
'log': self._logger,
}
self._server = eventlet.spawn(**wsgi_kwargs)
@property
def host(self):
@ -246,7 +252,10 @@ class Server(object):
"""
LOG.info(_LI("Stopping WSGI server."))
self._server.kill()
if self._server is not None:
# Resize pool to stop new requests from being processed
self._pool.resize(0)
self._server.kill()
def wait(self):
"""Block, until the server has stopped.
@ -257,10 +266,19 @@ class Server(object):
"""
try:
self._server.wait()
if self._server is not None:
self._pool.waitall()
self._server.wait()
except greenlet.GreenletExit:
LOG.info(_LI("WSGI server has stopped."))
def reset(self):
"""Reset server greenpool size to default.
:returns: None
"""
self._pool.resize(self.pool_size)
class Request(webob.Request):
pass