diff --git a/manila/cmd/all.py b/manila/cmd/all.py index 71833223bc..c8bd24d6b3 100755 --- a/manila/cmd/all.py +++ b/manila/cmd/all.py @@ -54,20 +54,20 @@ def main(): LOG = log.getLogger('manila.all') utils.monkey_patch() - servers = [] + launcher = service.process_launcher() # manila-api try: - servers.append(service.WSGIService('osapi_share')) + server = service.WSGIService('osapi_share') + launcher.launch_service(server, workers=server.workers or 1) except (Exception, SystemExit): LOG.exception(_LE('Failed to load osapi_share')) for binary in ['manila-share', 'manila-scheduler', 'manila-api']: try: - servers.append(service.Service.create(binary=binary)) + launcher.launch_service(service.Service.create(binary=binary)) except (Exception, SystemExit): LOG.exception(_LE('Failed to load %s'), binary) - service.serve(*servers) - service.wait() + launcher.wait() if __name__ == '__main__': diff --git a/manila/cmd/api.py b/manila/cmd/api.py index b2d8821336..b72343d126 100755 --- a/manila/cmd/api.py +++ b/manila/cmd/api.py @@ -18,10 +18,6 @@ """Starter script for manila OS API.""" -# NOTE(jdg): If we port over multi worker code from Nova -# we'll need to set monkey_patch(os=False), unless -# eventlet is updated/released to fix the root issue - import eventlet eventlet.monkey_patch() @@ -48,9 +44,11 @@ def main(): config.verify_share_protocols() log.setup(CONF, "manila") utils.monkey_patch() + + launcher = service.process_launcher() server = service.WSGIService('osapi_share') - service.serve(server) - service.wait() + launcher.launch_service(server, workers=server.workers or 1) + launcher.wait() if __name__ == '__main__': diff --git a/manila/cmd/share.py b/manila/cmd/share.py index 335f922e21..8c0f2b5b19 100755 --- a/manila/cmd/share.py +++ b/manila/cmd/share.py @@ -42,17 +42,17 @@ def main(): version=version.version_string()) log.setup(CONF, "manila") utils.monkey_patch() - launcher = service.ProcessLauncher() + launcher = service.process_launcher() if CONF.enabled_share_backends: for backend in CONF.enabled_share_backends: host = "%s@%s" % (CONF.host, backend) server = service.Service.create(host=host, service_name=backend, binary='manila-share') - launcher.launch_server(server) + launcher.launch_service(server) else: server = service.Service.create(binary='manila-share') - launcher.launch_server(server) + launcher.launch_service(server) launcher.wait() diff --git a/manila/service.py b/manila/service.py index 6f14e57279..af040090f5 100644 --- a/manila/service.py +++ b/manila/service.py @@ -17,20 +17,15 @@ """Generic Node base class for all workers that run on hosts.""" -import errno import inspect import os import random -import signal -import sys -import time -import eventlet -import greenlet from oslo_config import cfg from oslo_log import log import oslo_messaging as messaging from oslo_service import loopingcall +from oslo_service import service from oslo_utils import importutils from manila import context @@ -62,271 +57,17 @@ service_opts = [ help='IP address for OpenStack Share API to listen on.'), cfg.IntOpt('osapi_share_listen_port', default=8786, - help='Port for OpenStack Share API to listen on.'), ] + help='Port for OpenStack Share API to listen on.'), + cfg.IntOpt('osapi_share_workers', + default=1, + help='Number of workers for OpenStack Share API service.'), +] CONF = cfg.CONF CONF.register_opts(service_opts) -class SignalExit(SystemExit): - def __init__(self, signo, exccode=1): - super(SignalExit, self).__init__(exccode) - self.signo = signo - - -class Launcher(object): - """Launch one or more services and wait for them to complete.""" - - def __init__(self): - """Initialize the service launcher. - - :returns: None - - """ - self._services = [] - - @staticmethod - def run_server(server): - """Start and wait for a server to finish. - - :param service: Server to run and wait for. - :returns: None - - """ - server.start() - server.wait() - - def launch_server(self, server): - """Load and start the given server. - - :param server: The server you would like to start. - :returns: None - - """ - gt = eventlet.spawn(self.run_server, server) - self._services.append(gt) - - def stop(self): - """Stop all services which are currently running. - - :returns: None - - """ - for service in self._services: - service.kill() - - def wait(self): - """Waits until all services have been stopped, and then returns. - - :returns: None - - """ - def sigterm(sig, frame): - LOG.info(_LI("SIGTERM received")) - # NOTE(jk0): Raise a ^C which is caught by the caller and cleanly - # shuts down the service. This does not yet handle eventlet - # threads. - raise KeyboardInterrupt - - signal.signal(signal.SIGTERM, sigterm) - - for service in self._services: - try: - service.wait() - except greenlet.GreenletExit: - pass - - -class ServerWrapper(object): - def __init__(self, server, workers): - self.server = server - self.workers = workers - self.children = set() - self.forktimes = [] - self.failed = False - - -class ProcessLauncher(object): - def __init__(self): - self.children = {} - self.sigcaught = None - self.totalwrap = 0 - self.failedwrap = 0 - self.running = True - rfd, self.writepipe = os.pipe() - self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') - - signal.signal(signal.SIGTERM, self._handle_signal) - signal.signal(signal.SIGINT, self._handle_signal) - - def _handle_signal(self, signo, frame): - self.sigcaught = signo - self.running = False - - # Allow the process to be killed again and die from natural causes - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGINT, signal.SIG_DFL) - - def _pipe_watcher(self): - # This will block until the write end is closed when the parent - # dies unexpectedly - self.readpipe.read() - - LOG.info(_LI('Parent process has died unexpectedly, exiting')) - - sys.exit(1) - - def _child_process(self, server): - # Setup child signal handlers differently - def _sigterm(*args): - signal.signal(signal.SIGTERM, signal.SIG_DFL) - raise SignalExit(signal.SIGTERM) - - signal.signal(signal.SIGTERM, _sigterm) - # Block SIGINT and let the parent send us a SIGTERM - # signal.signal(signal.SIGINT, signal.SIG_IGN) - # This differs from the behavior in nova in that we dont ignore this - # It allows the non-wsgi services to be terminated properly - signal.signal(signal.SIGINT, _sigterm) - - # Reopen the eventlet hub to make sure we don't share an epoll - # fd with parent and/or siblings, which would be bad - eventlet.hubs.use_hub() - - # Close write to ensure only parent has it open - os.close(self.writepipe) - # Create greenthread to watch for parent to close pipe - eventlet.spawn(self._pipe_watcher) - - # Reseed random number generator - random.seed() - - launcher = Launcher() - launcher.run_server(server) - - def _start_child(self, wrap): - if len(wrap.forktimes) > wrap.workers: - # Limit ourselves to one process a second (over the period of - # number of workers * 1 second). This will allow workers to - # start up quickly but ensure we don't fork off children that - # die instantly too quickly. - if time.time() - wrap.forktimes[0] < wrap.workers: - LOG.info(_LI('Forking too fast, sleeping')) - time.sleep(1) - - wrap.forktimes.pop(0) - - wrap.forktimes.append(time.time()) - - pid = os.fork() - if pid == 0: - # NOTE(johannes): All exceptions are caught to ensure this - # doesn't fallback into the loop spawning children. It would - # be bad for a child to spawn more children. - status = 0 - try: - self._child_process(wrap.server) - except SignalExit as exc: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[exc.signo] - LOG.info(_LI('Caught %s, exiting'), signame) - status = exc.code - except SystemExit as exc: - status = exc.code - except BaseException: - LOG.exception(_LE('Unhandled exception')) - status = 2 - finally: - wrap.server.stop() - - os._exit(status) - - LOG.info(_LI('Started child %d'), pid) - - wrap.children.add(pid) - self.children[pid] = wrap - - return pid - - def launch_server(self, server, workers=1): - wrap = ServerWrapper(server, workers) - self.totalwrap = self.totalwrap + 1 - LOG.info(_LI('Starting %d workers'), wrap.workers) - while (self.running and len(wrap.children) < wrap.workers - and not wrap.failed): - self._start_child(wrap) - - def _wait_child(self): - try: - # Don't block if no child processes have exited - pid, status = os.waitpid(0, os.WNOHANG) - if not pid: - return None - except OSError as exc: - if exc.errno not in (errno.EINTR, errno.ECHILD): - raise - return None - - code = 0 - if os.WIFSIGNALED(status): - sig = os.WTERMSIG(status) - LOG.info(_LI('Child %(pid)d killed by signal %(sig)d'), - {'pid': pid, 'sig': sig}) - else: - code = os.WEXITSTATUS(status) - LOG.info(_LI('Child %(pid)d exited with status %(code)d'), - {'pid': pid, 'code': code}) - - if pid not in self.children: - LOG.warning(_LW('pid %d not in child list'), pid) - return None - - wrap = self.children.pop(pid) - wrap.children.remove(pid) - if 2 == code: - wrap.failed = True - self.failedwrap = self.failedwrap + 1 - LOG.info(_LI('_wait_child %d'), self.failedwrap) - if self.failedwrap == self.totalwrap: - self.running = False - return wrap - - def wait(self): - """Loop waiting on children to die and respawning as necessary.""" - while self.running: - wrap = self._wait_child() - if not wrap: - # Yield to other threads if no children have exited - # Sleep for a short time to avoid excessive CPU usage - # (see bug #1095346) - eventlet.greenthread.sleep(.01) - continue - - LOG.info(_LI('wait wrap.failed %s'), wrap.failed) - while (self.running and len(wrap.children) < wrap.workers - and not wrap.failed): - self._start_child(wrap) - - if self.sigcaught: - signame = {signal.SIGTERM: 'SIGTERM', - signal.SIGINT: 'SIGINT'}[self.sigcaught] - LOG.info(_LI('Caught %s, stopping children'), signame) - - for pid in self.children: - try: - os.kill(pid, signal.SIGTERM) - except OSError as exc: - if exc.errno != errno.ESRCH: - raise - - # Wait for children to die - if self.children: - LOG.info(_LI('Waiting on %d children to exit'), len(self.children)) - while self.children: - self._wait_child() - - -class Service(object): +class Service(service.Service): """Service object for binaries running on hosts. A service takes a manager and enables rpc by listening to queues based @@ -469,6 +210,8 @@ class Service(object): pass self.timers = [] + super(Service, self).stop() + def wait(self): for x in self.timers: try: @@ -514,7 +257,7 @@ class Service(object): LOG.exception(_LE('model server went away')) -class WSGIService(object): +class WSGIService(service.ServiceBase): """Provides ability to launch API from a 'paste' configuration.""" def __init__(self, name, loader=None): @@ -533,6 +276,13 @@ class WSGIService(object): self.app = self.loader.load_app(name) self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0") self.port = getattr(CONF, '%s_listen_port' % name, 0) + self.workers = getattr(CONF, '%s_workers' % name, None) + if self.workers < 1: + LOG.warn( + _LW("Value of config option %(name)s_workers must be integer " + "greater than 1. Input value ignored.") % {'name': name}) + # Reset workers to default + self.workers = None self.server = wsgi.Server(name, self.app, host=self.host, @@ -589,6 +339,17 @@ class WSGIService(object): """ self.server.wait() + def reset(self): + """Reset server greenpool size to default. + + :returns: None + """ + self.server.reset() + + +def process_launcher(): + return service.ProcessLauncher(CONF) + # NOTE(vish): the global launcher is to maintain the existing # functionality of calling service.serve + @@ -596,12 +357,11 @@ class WSGIService(object): _launcher = None -def serve(*servers): +def serve(server, workers=None): global _launcher - if not _launcher: - _launcher = Launcher() - for server in servers: - _launcher.launch_server(server) + if _launcher: + raise RuntimeError(_('serve() can only be called once')) + _launcher = service.launch(CONF, server, workers=workers) def wait(): diff --git a/manila/tests/cmd/test_all.py b/manila/tests/cmd/test_all.py index 016f75937c..c95dd29f0e 100644 --- a/manila/tests/cmd/test_all.py +++ b/manila/tests/cmd/test_all.py @@ -38,10 +38,9 @@ class ManilaCmdAllTestCase(test.TestCase): self.mock_object(log, 'register_options') self.mock_object(log, 'getLogger') self.mock_object(utils, 'monkey_patch') + self.mock_object(service, 'process_launcher') self.mock_object(service, 'WSGIService') self.mock_object(service.Service, 'create') - self.mock_object(service, 'serve') - self.mock_object(service, 'wait') self.wsgi_service = service.WSGIService.return_value self.service = service.Service.create.return_value self.fake_log = log.getLogger.return_value @@ -53,34 +52,31 @@ class ManilaCmdAllTestCase(test.TestCase): log.register_options.assert_called_once_with(CONF) log.getLogger.assert_called_once_with('manila.all') utils.monkey_patch.assert_called_once_with() + service.process_launcher.assert_called_once_with() service.WSGIService.assert_called_once_with('osapi_share') - service.wait.assert_called_once_with() def test_main(self): manila_all.main() self._common_checks() self.assertFalse(self.fake_log.exception.called) - service.serve.assert_has_calls([ - mock.call(self.wsgi_service, *[self.service] * 3) - ]) + self.assertTrue( + service.process_launcher.return_value.launch_service.called) + self.assertTrue(service.process_launcher.return_value.wait.called) - @ddt.data(Exception(), SystemExit()) - def test_main_wsgi_service_osapi_share_exception(self, exc): - service.WSGIService.side_effect = exc - - manila_all.main() - - self._common_checks() - self.fake_log.exception.assert_called_once_with(mock.ANY) - service.serve.assert_has_calls([mock.call(*[self.service] * 3)]) - - @ddt.data(Exception(), SystemExit()) - def test_main_service_create_exception(self, exc): - service.Service.create.side_effect = exc + @ddt.data( + *[(exc, exc_in_wsgi) + for exc in (Exception(), SystemExit()) + for exc_in_wsgi in (True, False)] + ) + @ddt.unpack + def test_main_raise_exception(self, exc, exc_in_wsgi): + if exc_in_wsgi: + service.WSGIService.side_effect = exc + else: + service.Service.create.side_effect = exc manila_all.main() self._common_checks() self.fake_log.exception.assert_has_calls([mock.ANY]) - service.serve.assert_has_calls([mock.call(self.wsgi_service)]) diff --git a/manila/tests/cmd/test_api.py b/manila/tests/cmd/test_api.py index f37431c08c..6b9186ed6b 100644 --- a/manila/tests/cmd/test_api.py +++ b/manila/tests/cmd/test_api.py @@ -31,18 +31,18 @@ class ManilaCmdApiTestCase(test.TestCase): self.mock_object(manila_api.log, 'setup') self.mock_object(manila_api.log, 'register_options') self.mock_object(manila_api.utils, 'monkey_patch') + self.mock_object(manila_api.service, 'process_launcher') self.mock_object(manila_api.service, 'WSGIService') - self.mock_object(manila_api.service, 'serve') - self.mock_object(manila_api.service, 'wait') manila_api.main() + process_launcher = manila_api.service.process_launcher + process_launcher.assert_called_once_with() + self.assertTrue(process_launcher.return_value.launch_service.called) + self.assertTrue(process_launcher.return_value.wait.called) self.assertEqual(CONF.project, 'manila') self.assertEqual(CONF.version, version.version_string()) manila_api.log.setup.assert_called_once_with(CONF, "manila") manila_api.log.register_options.assert_called_once_with(CONF) manila_api.utils.monkey_patch.assert_called_once_with() manila_api.service.WSGIService.assert_called_once_with('osapi_share') - manila_api.service.wait.assert_called_once_with() - manila_api.service.serve.assert_called_once_with( - manila_api.service.WSGIService.return_value) diff --git a/manila/tests/cmd/test_share.py b/manila/tests/cmd/test_share.py index fc792a9cba..f9bcf3c972 100644 --- a/manila/tests/cmd/test_share.py +++ b/manila/tests/cmd/test_share.py @@ -32,10 +32,10 @@ class ManilaCmdShareTestCase(test.TestCase): self.mock_object(manila_share.log, 'setup') self.mock_object(manila_share.log, 'register_options') self.mock_object(manila_share.utils, 'monkey_patch') - self.mock_object(manila_share.service, 'ProcessLauncher') + self.mock_object(manila_share.service, 'process_launcher') self.mock_object(manila_share.service.Service, 'create') - self.launcher = manila_share.service.ProcessLauncher.return_value - self.mock_object(self.launcher, 'launch_server') + self.launcher = manila_share.service.process_launcher.return_value + self.mock_object(self.launcher, 'launch_service') self.mock_object(self.launcher, 'wait') self.server = manila_share.service.Service.create.return_value fake_host = 'fake_host' @@ -48,7 +48,7 @@ class ManilaCmdShareTestCase(test.TestCase): manila_share.log.setup.assert_called_once_with(CONF, "manila") manila_share.log.register_options.assert_called_once_with(CONF) manila_share.utils.monkey_patch.assert_called_once_with() - manila_share.service.ProcessLauncher.assert_called_once_with() + manila_share.service.process_launcher.assert_called_once_with() self.launcher.wait.assert_called_once_with() if backends: @@ -58,9 +58,9 @@ class ManilaCmdShareTestCase(test.TestCase): service_name=backend, binary='manila-share') for backend in backends ]) - self.launcher.launch_server.assert_has_calls([ + self.launcher.launch_service.assert_has_calls([ mock.call(self.server) for backend in backends]) else: manila_share.service.Service.create.assert_called_once_with( binary='manila-share') - self.launcher.launch_server.assert_called_once_with(self.server) + self.launcher.launch_service.assert_called_once_with(self.server) diff --git a/manila/tests/test_service.py b/manila/tests/test_service.py index cc0a1bf0dc..b36ee38a18 100644 --- a/manila/tests/test_service.py +++ b/manila/tests/test_service.py @@ -184,24 +184,27 @@ class ServiceTestCase(test.TestCase): class TestWSGIService(test.TestCase): - @mock.patch.object(wsgi.Loader, 'load_app', mock.Mock()) + def setUp(self): + super(self.__class__, self).setUp() + self.mock_object(wsgi.Loader, 'load_app') + self.test_service = service.WSGIService("test_service") + def test_service_random_port(self): - test_service = service.WSGIService("test_service") - self.assertEqual(0, test_service.port) - test_service.start() - self.assertNotEqual(0, test_service.port) - test_service.stop() + self.assertEqual(0, self.test_service.port) + self.test_service.start() + self.assertNotEqual(0, self.test_service.port) + self.test_service.stop() wsgi.Loader.load_app.assert_called_once_with("test_service") + def test_reset_pool_size_to_default(self): + self.test_service.start() -class TestLauncher(test.TestCase): + # Stopping the service, which in turn sets pool size to 0 + self.test_service.stop() + self.assertEqual(0, self.test_service.server._pool.size) - @mock.patch.object(wsgi.Loader, 'load_app', mock.Mock()) - def test_launch_app(self): - self.service = service.WSGIService("test_service") - self.assertEqual(0, self.service.port) - launcher = service.Launcher() - launcher.launch_server(self.service) - self.assertEqual(0, self.service.port) - launcher.stop() + # Resetting pool size to default + self.test_service.reset() + self.test_service.start() + self.assertEqual(1000, self.test_service.server._pool.size) wsgi.Loader.load_app.assert_called_once_with("test_service") diff --git a/manila/tests/test_wsgi.py b/manila/tests/test_wsgi.py index d9bdcf7811..d5ce7703e9 100644 --- a/manila/tests/test_wsgi.py +++ b/manila/tests/test_wsgi.py @@ -21,6 +21,8 @@ import ssl import tempfile import urllib2 +import eventlet +import mock from oslo_config import cfg import six import testtools @@ -87,12 +89,11 @@ class TestWSGIServer(test.TestCase): """WSGI server tests.""" def test_no_app(self): - server = manila.wsgi.Server("test_app", None) + server = manila.wsgi.Server("test_app", None, host="127.0.0.1", port=0) self.assertEqual("test_app", server.name) def test_start_random_port(self): server = manila.wsgi.Server("test_random_port", None, host="127.0.0.1") - self.assertEqual(0, server.port) server.start() self.assertNotEqual(0, server.port) server.stop() @@ -113,6 +114,8 @@ class TestWSGIServer(test.TestCase): server.wait() def test_app(self): + self.mock_object( + eventlet, 'spawn', mock.Mock(side_effect=eventlet.spawn)) greetings = 'Hello, World!!!' def hello_world(env, start_response): @@ -123,12 +126,23 @@ class TestWSGIServer(test.TestCase): start_response('200 OK', [('Content-Type', 'text/plain')]) return [greetings] - server = manila.wsgi.Server("test_app", hello_world) + server = manila.wsgi.Server( + "test_app", hello_world, host="127.0.0.1", port=0) server.start() response = urllib2.urlopen('http://127.0.0.1:%d/' % server.port) self.assertEqual(greetings, response.read()) + # Verify provided parameters to eventlet.spawn func + eventlet.spawn.assert_called_once_with( + func=eventlet.wsgi.server, + sock=mock.ANY, + site=server.app, + protocol=server._protocol, + custom_pool=server._pool, + log=server._logger, + ) + server.stop() def test_app_using_ssl(self): @@ -143,7 +157,8 @@ class TestWSGIServer(test.TestCase): def hello_world(req): return greetings - server = manila.wsgi.Server("test_app", hello_world) + server = manila.wsgi.Server( + "test_app", hello_world, host="127.0.0.1", port=0) server.start() if hasattr(ssl, '_create_unverified_context'): @@ -190,6 +205,19 @@ class TestWSGIServer(test.TestCase): server.stop() + def test_reset_pool_size_to_default(self): + server = manila.wsgi.Server("test_resize", None, host="127.0.0.1") + server.start() + + # Stopping the server, which in turn sets pool size to 0 + server.stop() + self.assertEqual(0, server._pool.size) + + # Resetting pool size to default + server.reset() + server.start() + self.assertEqual(1000, server._pool.size) + class ExceptionTest(test.TestCase): diff --git a/manila/wsgi.py b/manila/wsgi.py index 43d6098d0b..0415ac2e01 100644 --- a/manila/wsgi.py +++ b/manila/wsgi.py @@ -31,7 +31,8 @@ import eventlet.wsgi import greenlet from oslo_config import cfg from oslo_log import log -from oslo_utils import netutils +from oslo_service import service +from oslo_utils import excutils from paste import deploy import routes.middleware import six @@ -40,6 +41,7 @@ import webob.exc from manila import exception from manila.i18n import _ +from manila.i18n import _LE from manila.i18n import _LI socket_opts = [ @@ -91,13 +93,13 @@ CONF.register_opts(eventlet_opts) LOG = log.getLogger(__name__) -class Server(object): +class Server(service.ServiceBase): """Server class to manage a WSGI server, serving a WSGI application.""" default_pool_size = 1000 def __init__(self, name, app, host=None, port=None, pool_size=None, - protocol=eventlet.wsgi.HttpProtocol): + protocol=eventlet.wsgi.HttpProtocol, backlog=128): """Initialize, but do not start, a WSGI server. :param name: Pretty name for logging. @@ -116,10 +118,14 @@ class Server(object): self._server = None self._socket = None self._protocol = protocol - self._pool = eventlet.GreenPool(pool_size or self.default_pool_size) + self.pool_size = pool_size or self.default_pool_size + self._pool = eventlet.GreenPool(self.pool_size) self._logger = log.getLogger("eventlet.wsgi.server") - def _get_socket(self, host, port, backlog): + if backlog < 1: + raise exception.InvalidInput( + reason='The backlog must be more than 1') + bind_addr = (host, port) # TODO(dims): eventlet's green dns/socket module does not actually # support IPv6 in getaddrinfo(). We need to get around this in the @@ -137,7 +143,7 @@ class Server(object): cert_file = CONF.ssl_cert_file key_file = CONF.ssl_key_file ca_file = CONF.ssl_ca_file - use_ssl = cert_file or key_file + self._use_ssl = cert_file or key_file if cert_file and not os.path.exists(cert_file): raise RuntimeError(_("Unable to find cert_file : %s") % cert_file) @@ -148,85 +154,85 @@ class Server(object): if key_file and not os.path.exists(key_file): raise RuntimeError(_("Unable to find key_file : %s") % key_file) - if use_ssl and (not cert_file or not key_file): + if self._use_ssl and (not cert_file or not key_file): raise RuntimeError(_("When running server in SSL mode, you must " "specify both a cert_file and key_file " "option value in your configuration file")) - def wrap_ssl(sock): - ssl_kwargs = { - 'server_side': True, - 'certfile': cert_file, - 'keyfile': key_file, - 'cert_reqs': ssl.CERT_NONE, - } - - if CONF.ssl_ca_file: - ssl_kwargs['ca_certs'] = ca_file - ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED - - return ssl.wrap_socket(sock, **ssl_kwargs) - - sock = None retry_until = time.time() + 30 - while not sock and time.time() < retry_until: + while not self._socket and time.time() < retry_until: try: - sock = eventlet.listen(bind_addr, - backlog=backlog, - family=family) - if use_ssl: - sock = wrap_ssl(sock) - + self._socket = eventlet.listen( + bind_addr, backlog=backlog, family=family) except socket.error as err: if err.args[0] != errno.EADDRINUSE: raise eventlet.sleep(0.1) - if not sock: + + if not self._socket: raise RuntimeError(_("Could not bind to %(host)s:%(port)s " "after trying for 30 seconds") % {'host': host, 'port': port}) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - # sockets can hang around forever without keepalive - netutils.set_tcp_keepalive(sock, - CONF.tcp_keepalive, - CONF.tcp_keepidle, - CONF.tcp_keepalive_interval, - CONF.tcp_keepalive_count) + (self._host, self._port) = self._socket.getsockname()[0:2] + LOG.info(_LI("%(name)s listening on %(_host)s:%(_port)s"), + {'name': self.name, '_host': self._host, '_port': self._port}) - return sock - - def _start(self): - """Run the blocking eventlet WSGI server. - - :returns: None - - """ - eventlet.wsgi.server(self._socket, - self.app, - protocol=self._protocol, - custom_pool=self._pool, - log=self._logger) - - def start(self, backlog=128): + def start(self): """Start serving a WSGI application. - :param backlog: Maximum number of queued connections. :returns: None :raises: manila.exception.InvalidInput """ - if backlog < 1: - raise exception.InvalidInput( - reason='The backlog must be more than 1') + # The server socket object will be closed after server exits, + # but the underlying file descriptor will remain open, and will + # give bad file descriptor error. So duplicating the socket object, + # to keep file descriptor usable. - self._socket = self._get_socket(self._host, - self._port, - backlog=backlog) - self._server = eventlet.spawn(self._start) - (self._host, self._port) = self._socket.getsockname()[0:2] - LOG.info(_LI("Started %(name)s on %(_host)s:%(_port)s"), - {'name': self.name, '_host': self._host, '_port': self._port}) + dup_socket = self._socket.dup() + if self._use_ssl: + try: + ssl_kwargs = { + 'server_side': True, + 'certfile': CONF.ssl_cert_file, + 'keyfile': CONF.ssl_key_file, + 'cert_reqs': ssl.CERT_NONE, + } + + if CONF.ssl_ca_file: + ssl_kwargs['ca_certs'] = CONF.ssl_ca_file + ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED + + dup_socket = ssl.wrap_socket(dup_socket, + **ssl_kwargs) + + dup_socket.setsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR, 1) + + # sockets can hang around forever without keepalive + dup_socket.setsockopt(socket.SOL_SOCKET, + socket.SO_KEEPALIVE, 1) + + except Exception: + with excutils.save_and_reraise_exception(): + LOG.error( + _LE("Failed to start %(name)s on %(_host)s:%(_port)s " + "with SSL support."), + {"name": self.name, "_host": self._host, + "_port": self._port} + ) + + wsgi_kwargs = { + 'func': eventlet.wsgi.server, + 'sock': dup_socket, + 'site': self.app, + 'protocol': self._protocol, + 'custom_pool': self._pool, + 'log': self._logger, + } + + self._server = eventlet.spawn(**wsgi_kwargs) @property def host(self): @@ -246,7 +252,10 @@ class Server(object): """ LOG.info(_LI("Stopping WSGI server.")) - self._server.kill() + if self._server is not None: + # Resize pool to stop new requests from being processed + self._pool.resize(0) + self._server.kill() def wait(self): """Block, until the server has stopped. @@ -257,10 +266,19 @@ class Server(object): """ try: - self._server.wait() + if self._server is not None: + self._pool.waitall() + self._server.wait() except greenlet.GreenletExit: LOG.info(_LI("WSGI server has stopped.")) + def reset(self): + """Reset server greenpool size to default. + + :returns: None + """ + self._pool.resize(self.pool_size) + class Request(webob.Request): pass