Use oslo.service for launching sahara
Changes: * use launcher for running sahara modules * use wsgi server from oslo.service library Change-Id: I9ba8f1e751ce9bb9b9adcebda6509107b98e2fc8
This commit is contained in:
parent
4d39b4aad6
commit
2679167bfb
@ -57,4 +57,6 @@ def main():
|
||||
server.setup_sahara_engine()
|
||||
server.setup_auth_policy()
|
||||
|
||||
server.start_server(app)
|
||||
launcher = server.get_process_launcher()
|
||||
launcher.launch_service(server.SaharaWSGIService("sahara-all", app))
|
||||
launcher.wait()
|
||||
|
@ -55,4 +55,6 @@ def main():
|
||||
server.setup_sahara_api('distributed')
|
||||
server.setup_auth_policy()
|
||||
|
||||
server.start_server(app)
|
||||
launcher = server.get_process_launcher()
|
||||
api_service = server.SaharaWSGIService("sahara-api", app)
|
||||
server.launch_api_service(launcher, api_service)
|
||||
|
@ -51,4 +51,6 @@ def main():
|
||||
server.setup_sahara_engine()
|
||||
|
||||
ops_server = ops.OpsServer()
|
||||
ops_server.start()
|
||||
launcher = server.get_process_launcher()
|
||||
launcher.launch_service(ops_server.get_service())
|
||||
launcher.wait()
|
||||
|
@ -143,7 +143,6 @@ def list_opts():
|
||||
from sahara.utils import poll_utils
|
||||
from sahara.utils import proxy
|
||||
from sahara.utils import ssh_remote
|
||||
from sahara.utils import wsgi
|
||||
|
||||
return [
|
||||
(None,
|
||||
@ -161,7 +160,6 @@ def list_opts():
|
||||
periodic.periodic_opts,
|
||||
proxy.opts,
|
||||
cpo.event_log_opts,
|
||||
wsgi.wsgi_opts,
|
||||
base.opts,
|
||||
heat_engine.heat_engine_opts,
|
||||
ssh_remote.ssh_config_options)),
|
||||
|
@ -17,7 +17,8 @@ import os
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_service import systemd
|
||||
from oslo_service import service as oslo_service
|
||||
from oslo_service import sslutils
|
||||
from oslo_service import wsgi as oslo_wsgi
|
||||
import stevedore
|
||||
|
||||
@ -33,7 +34,6 @@ from sahara.service import periodic
|
||||
from sahara.utils.openstack import cinder
|
||||
from sahara.utils import remote
|
||||
from sahara.utils import rpc as messaging
|
||||
from sahara.utils import wsgi
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -49,7 +49,7 @@ opts = [
|
||||
default='ssh',
|
||||
help='A method for Sahara to execute commands '
|
||||
'on VMs.'),
|
||||
cfg.IntOpt('api_workers', default=0,
|
||||
cfg.IntOpt('api_workers', default=1,
|
||||
help="Number of workers for Sahara API service (0 means "
|
||||
"all-in-one-thread configuration).")
|
||||
]
|
||||
@ -58,6 +58,13 @@ CONF = cfg.CONF
|
||||
CONF.register_opts(opts)
|
||||
|
||||
|
||||
class SaharaWSGIService(oslo_wsgi.Server):
|
||||
def __init__(self, service_name, app):
|
||||
super(SaharaWSGIService, self).__init__(
|
||||
CONF, service_name, app, host=CONF.host, port=CONF.port,
|
||||
use_ssl=sslutils.is_enabled(CONF))
|
||||
|
||||
|
||||
def setup_common(possible_topdir, service_name):
|
||||
dev_conf = os.path.join(possible_topdir,
|
||||
'etc',
|
||||
@ -145,8 +152,10 @@ def _get_ops_driver(driver_name):
|
||||
return _load_driver('sahara.run.mode', driver_name)
|
||||
|
||||
|
||||
def start_server(app):
|
||||
server = wsgi.Server()
|
||||
server.start(app)
|
||||
systemd.notify_once()
|
||||
server.wait()
|
||||
def get_process_launcher():
|
||||
return oslo_service.ProcessLauncher(CONF)
|
||||
|
||||
|
||||
def launch_api_service(launcher, service):
|
||||
launcher.launch_service(service, workers=CONF.api_workers)
|
||||
launcher.wait()
|
||||
|
@ -89,9 +89,8 @@ class RPCServer(object):
|
||||
endpoints=[self],
|
||||
executor='eventlet')
|
||||
|
||||
def start(self):
|
||||
self.__server.start()
|
||||
self.__server.wait()
|
||||
def get_service(self):
|
||||
return self.__server
|
||||
|
||||
|
||||
def setup():
|
||||
|
@ -19,35 +19,17 @@
|
||||
"""Utility methods for working with WSGI servers."""
|
||||
|
||||
import datetime
|
||||
import errno
|
||||
import os
|
||||
import signal
|
||||
|
||||
import eventlet
|
||||
from eventlet import wsgi
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_service import sslutils
|
||||
|
||||
from sahara import exceptions
|
||||
from sahara.i18n import _
|
||||
from sahara.i18n import _LE
|
||||
from sahara.i18n import _LI
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
wsgi_opts = [
|
||||
cfg.IntOpt('max_header_line',
|
||||
default=16384,
|
||||
help="Maximum line size of message headers to be accepted. "
|
||||
"max_header_line may need to be increased when using "
|
||||
"large tokens (typically those generated by the "
|
||||
"Keystone v3 API with big service catalogs)."),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(wsgi_opts)
|
||||
|
||||
|
||||
class ActionDispatcher(object):
|
||||
@ -106,113 +88,3 @@ class JSONDeserializer(TextDeserializer):
|
||||
|
||||
def default(self, datastring):
|
||||
return {'body': self._from_json(datastring)}
|
||||
|
||||
|
||||
class Server(object):
|
||||
"""Server class to manage multiple WSGI sockets and applications."""
|
||||
|
||||
def __init__(self, threads=500):
|
||||
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
|
||||
self.threads = threads
|
||||
self.children = []
|
||||
self.running = True
|
||||
|
||||
def start(self, application):
|
||||
"""Run a WSGI server with the given application.
|
||||
|
||||
:param application: The application to run in the WSGI server
|
||||
"""
|
||||
def kill_children(*args):
|
||||
"""Kills the entire process group."""
|
||||
LOG.error(_LE('SIGTERM received'))
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
self.running = False
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
|
||||
def hup(*args):
|
||||
"""Shuts down the server(s).
|
||||
|
||||
Shuts down the server(s), but allows running requests to complete
|
||||
"""
|
||||
LOG.error(_LE('SIGHUP received'))
|
||||
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
||||
os.killpg(0, signal.SIGHUP)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
|
||||
self.application = application
|
||||
self.sock = eventlet.listen((CONF.host, CONF.port), backlog=500)
|
||||
if sslutils.is_enabled(CONF):
|
||||
LOG.info(_LI("Using HTTPS for port %s"), CONF.port)
|
||||
self.sock = sslutils.wrap(CONF, self.sock)
|
||||
|
||||
if CONF.api_workers == 0:
|
||||
# Useful for profiling, test, debug etc.
|
||||
self.pool = eventlet.GreenPool(size=self.threads)
|
||||
self.pool.spawn_n(self._single_run, application, self.sock)
|
||||
return
|
||||
|
||||
LOG.debug("Starting %d workers", CONF.api_workers)
|
||||
signal.signal(signal.SIGTERM, kill_children)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
while len(self.children) < CONF.api_workers:
|
||||
self.run_child()
|
||||
|
||||
def wait_on_children(self):
|
||||
while self.running:
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
if pid in self.children:
|
||||
LOG.error(_LE('Removing dead child %s'), pid)
|
||||
self.children.remove(pid)
|
||||
self.run_child()
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
break
|
||||
eventlet.greenio.shutdown_safe(self.sock)
|
||||
self.sock.close()
|
||||
LOG.debug('Server exited')
|
||||
|
||||
def wait(self):
|
||||
"""Wait until all servers have completed running."""
|
||||
try:
|
||||
if self.children:
|
||||
self.wait_on_children()
|
||||
else:
|
||||
self.pool.waitall()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def run_child(self):
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
self.run_server()
|
||||
LOG.debug('Child %d exiting normally', os.getpid())
|
||||
return
|
||||
else:
|
||||
LOG.info(_LI('Started child %s'), pid)
|
||||
self.children.append(pid)
|
||||
|
||||
def run_server(self):
|
||||
"""Run a WSGI server."""
|
||||
self.pool = eventlet.GreenPool(size=self.threads)
|
||||
wsgi.server(self.sock,
|
||||
self.application,
|
||||
custom_pool=self.pool,
|
||||
log=LOG,
|
||||
debug=False)
|
||||
self.pool.waitall()
|
||||
|
||||
def _single_run(self, application, sock):
|
||||
"""Start a WSGI server in a new green thread."""
|
||||
LOG.info(_LI("Starting single process server"))
|
||||
eventlet.wsgi.server(sock, application,
|
||||
custom_pool=self.pool,
|
||||
log=LOG,
|
||||
debug=False)
|
||||
|
@ -10,3 +10,4 @@ namespace = oslo.middleware.cors
|
||||
namespace = oslo.policy
|
||||
namespace = oslo.service.periodic_task
|
||||
namespace = oslo.service.sslutils
|
||||
namespace = oslo.service.wsgi
|
||||
|
Loading…
Reference in New Issue
Block a user