Merge "Use oslo.service for launching sahara"
This commit is contained in:
commit
f0d11daf6d
@ -57,4 +57,6 @@ def main():
|
|||||||
server.setup_sahara_engine()
|
server.setup_sahara_engine()
|
||||||
server.setup_auth_policy()
|
server.setup_auth_policy()
|
||||||
|
|
||||||
server.start_server(app)
|
launcher = server.get_process_launcher()
|
||||||
|
launcher.launch_service(server.SaharaWSGIService("sahara-all", app))
|
||||||
|
launcher.wait()
|
||||||
|
@ -55,4 +55,6 @@ def main():
|
|||||||
server.setup_sahara_api('distributed')
|
server.setup_sahara_api('distributed')
|
||||||
server.setup_auth_policy()
|
server.setup_auth_policy()
|
||||||
|
|
||||||
server.start_server(app)
|
launcher = server.get_process_launcher()
|
||||||
|
api_service = server.SaharaWSGIService("sahara-api", app)
|
||||||
|
server.launch_api_service(launcher, api_service)
|
||||||
|
@ -51,4 +51,6 @@ def main():
|
|||||||
server.setup_sahara_engine()
|
server.setup_sahara_engine()
|
||||||
|
|
||||||
ops_server = ops.OpsServer()
|
ops_server = ops.OpsServer()
|
||||||
ops_server.start()
|
launcher = server.get_process_launcher()
|
||||||
|
launcher.launch_service(ops_server.get_service())
|
||||||
|
launcher.wait()
|
||||||
|
@ -143,7 +143,6 @@ def list_opts():
|
|||||||
from sahara.utils import poll_utils
|
from sahara.utils import poll_utils
|
||||||
from sahara.utils import proxy
|
from sahara.utils import proxy
|
||||||
from sahara.utils import ssh_remote
|
from sahara.utils import ssh_remote
|
||||||
from sahara.utils import wsgi
|
|
||||||
|
|
||||||
return [
|
return [
|
||||||
(None,
|
(None,
|
||||||
@ -161,7 +160,6 @@ def list_opts():
|
|||||||
periodic.periodic_opts,
|
periodic.periodic_opts,
|
||||||
proxy.opts,
|
proxy.opts,
|
||||||
cpo.event_log_opts,
|
cpo.event_log_opts,
|
||||||
wsgi.wsgi_opts,
|
|
||||||
base.opts,
|
base.opts,
|
||||||
heat_engine.heat_engine_opts,
|
heat_engine.heat_engine_opts,
|
||||||
ssh_remote.ssh_config_options)),
|
ssh_remote.ssh_config_options)),
|
||||||
|
@ -17,7 +17,8 @@ import os
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_service import systemd
|
from oslo_service import service as oslo_service
|
||||||
|
from oslo_service import sslutils
|
||||||
from oslo_service import wsgi as oslo_wsgi
|
from oslo_service import wsgi as oslo_wsgi
|
||||||
import stevedore
|
import stevedore
|
||||||
|
|
||||||
@ -33,7 +34,6 @@ from sahara.service import periodic
|
|||||||
from sahara.utils.openstack import cinder
|
from sahara.utils.openstack import cinder
|
||||||
from sahara.utils import remote
|
from sahara.utils import remote
|
||||||
from sahara.utils import rpc as messaging
|
from sahara.utils import rpc as messaging
|
||||||
from sahara.utils import wsgi
|
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
@ -49,7 +49,7 @@ opts = [
|
|||||||
default='ssh',
|
default='ssh',
|
||||||
help='A method for Sahara to execute commands '
|
help='A method for Sahara to execute commands '
|
||||||
'on VMs.'),
|
'on VMs.'),
|
||||||
cfg.IntOpt('api_workers', default=0,
|
cfg.IntOpt('api_workers', default=1,
|
||||||
help="Number of workers for Sahara API service (0 means "
|
help="Number of workers for Sahara API service (0 means "
|
||||||
"all-in-one-thread configuration).")
|
"all-in-one-thread configuration).")
|
||||||
]
|
]
|
||||||
@ -58,6 +58,13 @@ CONF = cfg.CONF
|
|||||||
CONF.register_opts(opts)
|
CONF.register_opts(opts)
|
||||||
|
|
||||||
|
|
||||||
|
class SaharaWSGIService(oslo_wsgi.Server):
|
||||||
|
def __init__(self, service_name, app):
|
||||||
|
super(SaharaWSGIService, self).__init__(
|
||||||
|
CONF, service_name, app, host=CONF.host, port=CONF.port,
|
||||||
|
use_ssl=sslutils.is_enabled(CONF))
|
||||||
|
|
||||||
|
|
||||||
def setup_common(possible_topdir, service_name):
|
def setup_common(possible_topdir, service_name):
|
||||||
dev_conf = os.path.join(possible_topdir,
|
dev_conf = os.path.join(possible_topdir,
|
||||||
'etc',
|
'etc',
|
||||||
@ -145,8 +152,10 @@ def _get_ops_driver(driver_name):
|
|||||||
return _load_driver('sahara.run.mode', driver_name)
|
return _load_driver('sahara.run.mode', driver_name)
|
||||||
|
|
||||||
|
|
||||||
def start_server(app):
|
def get_process_launcher():
|
||||||
server = wsgi.Server()
|
return oslo_service.ProcessLauncher(CONF)
|
||||||
server.start(app)
|
|
||||||
systemd.notify_once()
|
|
||||||
server.wait()
|
def launch_api_service(launcher, service):
|
||||||
|
launcher.launch_service(service, workers=CONF.api_workers)
|
||||||
|
launcher.wait()
|
||||||
|
@ -89,9 +89,8 @@ class RPCServer(object):
|
|||||||
endpoints=[self],
|
endpoints=[self],
|
||||||
executor='eventlet')
|
executor='eventlet')
|
||||||
|
|
||||||
def start(self):
|
def get_service(self):
|
||||||
self.__server.start()
|
return self.__server
|
||||||
self.__server.wait()
|
|
||||||
|
|
||||||
|
|
||||||
def setup():
|
def setup():
|
||||||
|
@ -19,35 +19,17 @@
|
|||||||
"""Utility methods for working with WSGI servers."""
|
"""Utility methods for working with WSGI servers."""
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import errno
|
|
||||||
import os
|
|
||||||
import signal
|
|
||||||
|
|
||||||
import eventlet
|
|
||||||
from eventlet import wsgi
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
from oslo_service import sslutils
|
|
||||||
|
|
||||||
from sahara import exceptions
|
from sahara import exceptions
|
||||||
from sahara.i18n import _
|
from sahara.i18n import _
|
||||||
from sahara.i18n import _LE
|
|
||||||
from sahara.i18n import _LI
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
wsgi_opts = [
|
|
||||||
cfg.IntOpt('max_header_line',
|
|
||||||
default=16384,
|
|
||||||
help="Maximum line size of message headers to be accepted. "
|
|
||||||
"max_header_line may need to be increased when using "
|
|
||||||
"large tokens (typically those generated by the "
|
|
||||||
"Keystone v3 API with big service catalogs)."),
|
|
||||||
]
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
CONF.register_opts(wsgi_opts)
|
|
||||||
|
|
||||||
|
|
||||||
class ActionDispatcher(object):
|
class ActionDispatcher(object):
|
||||||
@ -106,113 +88,3 @@ class JSONDeserializer(TextDeserializer):
|
|||||||
|
|
||||||
def default(self, datastring):
|
def default(self, datastring):
|
||||||
return {'body': self._from_json(datastring)}
|
return {'body': self._from_json(datastring)}
|
||||||
|
|
||||||
|
|
||||||
class Server(object):
|
|
||||||
"""Server class to manage multiple WSGI sockets and applications."""
|
|
||||||
|
|
||||||
def __init__(self, threads=500):
|
|
||||||
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
|
|
||||||
self.threads = threads
|
|
||||||
self.children = []
|
|
||||||
self.running = True
|
|
||||||
|
|
||||||
def start(self, application):
|
|
||||||
"""Run a WSGI server with the given application.
|
|
||||||
|
|
||||||
:param application: The application to run in the WSGI server
|
|
||||||
"""
|
|
||||||
def kill_children(*args):
|
|
||||||
"""Kills the entire process group."""
|
|
||||||
LOG.error(_LE('SIGTERM received'))
|
|
||||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
||||||
self.running = False
|
|
||||||
os.killpg(0, signal.SIGTERM)
|
|
||||||
|
|
||||||
def hup(*args):
|
|
||||||
"""Shuts down the server(s).
|
|
||||||
|
|
||||||
Shuts down the server(s), but allows running requests to complete
|
|
||||||
"""
|
|
||||||
LOG.error(_LE('SIGHUP received'))
|
|
||||||
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
|
||||||
os.killpg(0, signal.SIGHUP)
|
|
||||||
signal.signal(signal.SIGHUP, hup)
|
|
||||||
|
|
||||||
self.application = application
|
|
||||||
self.sock = eventlet.listen((CONF.host, CONF.port), backlog=500)
|
|
||||||
if sslutils.is_enabled(CONF):
|
|
||||||
LOG.info(_LI("Using HTTPS for port %s"), CONF.port)
|
|
||||||
self.sock = sslutils.wrap(CONF, self.sock)
|
|
||||||
|
|
||||||
if CONF.api_workers == 0:
|
|
||||||
# Useful for profiling, test, debug etc.
|
|
||||||
self.pool = eventlet.GreenPool(size=self.threads)
|
|
||||||
self.pool.spawn_n(self._single_run, application, self.sock)
|
|
||||||
return
|
|
||||||
|
|
||||||
LOG.debug("Starting %d workers", CONF.api_workers)
|
|
||||||
signal.signal(signal.SIGTERM, kill_children)
|
|
||||||
signal.signal(signal.SIGHUP, hup)
|
|
||||||
while len(self.children) < CONF.api_workers:
|
|
||||||
self.run_child()
|
|
||||||
|
|
||||||
def wait_on_children(self):
|
|
||||||
while self.running:
|
|
||||||
try:
|
|
||||||
pid, status = os.wait()
|
|
||||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
|
||||||
if pid in self.children:
|
|
||||||
LOG.error(_LE('Removing dead child %s'), pid)
|
|
||||||
self.children.remove(pid)
|
|
||||||
self.run_child()
|
|
||||||
except OSError as err:
|
|
||||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
|
||||||
raise
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
|
|
||||||
os.killpg(0, signal.SIGTERM)
|
|
||||||
break
|
|
||||||
eventlet.greenio.shutdown_safe(self.sock)
|
|
||||||
self.sock.close()
|
|
||||||
LOG.debug('Server exited')
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
"""Wait until all servers have completed running."""
|
|
||||||
try:
|
|
||||||
if self.children:
|
|
||||||
self.wait_on_children()
|
|
||||||
else:
|
|
||||||
self.pool.waitall()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def run_child(self):
|
|
||||||
pid = os.fork()
|
|
||||||
if pid == 0:
|
|
||||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
|
||||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
||||||
self.run_server()
|
|
||||||
LOG.debug('Child %d exiting normally', os.getpid())
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
LOG.info(_LI('Started child %s'), pid)
|
|
||||||
self.children.append(pid)
|
|
||||||
|
|
||||||
def run_server(self):
|
|
||||||
"""Run a WSGI server."""
|
|
||||||
self.pool = eventlet.GreenPool(size=self.threads)
|
|
||||||
wsgi.server(self.sock,
|
|
||||||
self.application,
|
|
||||||
custom_pool=self.pool,
|
|
||||||
log=LOG,
|
|
||||||
debug=False)
|
|
||||||
self.pool.waitall()
|
|
||||||
|
|
||||||
def _single_run(self, application, sock):
|
|
||||||
"""Start a WSGI server in a new green thread."""
|
|
||||||
LOG.info(_LI("Starting single process server"))
|
|
||||||
eventlet.wsgi.server(sock, application,
|
|
||||||
custom_pool=self.pool,
|
|
||||||
log=LOG,
|
|
||||||
debug=False)
|
|
||||||
|
@ -10,3 +10,4 @@ namespace = oslo.middleware.cors
|
|||||||
namespace = oslo.policy
|
namespace = oslo.policy
|
||||||
namespace = oslo.service.periodic_task
|
namespace = oslo.service.periodic_task
|
||||||
namespace = oslo.service.sslutils
|
namespace = oslo.service.sslutils
|
||||||
|
namespace = oslo.service.wsgi
|
||||||
|
Loading…
Reference in New Issue
Block a user