Implemented multi-worker solution for Sahara API
Implemented temporary solution for wsgi/flask. Should be replaced during migration to Pecan/WSME. Code is based on Heat code. Implements blueprint: sahara-api-workers Change-Id: Ica0ecdc58a1ab345b437e78590a5b29e1d0f3789
This commit is contained in:
parent
bcd1c76651
commit
349c140bbd
@ -407,6 +407,10 @@
|
||||
# A method for Sahara to execute commands on VMs. (string value)
|
||||
#remote = ssh
|
||||
|
||||
# Number of workers for Sahara API service (0 means all-in-one-thread
|
||||
# configuration). (integer value)
|
||||
#api_workers = 0
|
||||
|
||||
# Postfix for storing jobs in hdfs. Will be added to '/user/<hdfs
|
||||
# user>/' path. (string value)
|
||||
#job_workflow_postfix =
|
||||
|
@ -4,6 +4,7 @@
|
||||
module=periodic_task
|
||||
module=policy
|
||||
module=sslutils
|
||||
module=systemd
|
||||
module=threadgroup
|
||||
|
||||
# The base module to hold the copy of openstack.common
|
||||
|
@ -15,12 +15,9 @@
|
||||
|
||||
import os
|
||||
|
||||
import eventlet
|
||||
from eventlet import wsgi
|
||||
import flask
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_log import loggers
|
||||
import six
|
||||
import stevedore
|
||||
from werkzeug import exceptions as werkzeug_exceptions
|
||||
@ -33,7 +30,7 @@ from sahara.api import v11 as api_v11
|
||||
from sahara import config
|
||||
from sahara import context
|
||||
from sahara.i18n import _LI
|
||||
from sahara.openstack.common import sslutils
|
||||
from sahara.openstack.common import systemd
|
||||
from sahara.plugins import base as plugins_base
|
||||
from sahara.service import api as service_api
|
||||
from sahara.service.edp import api as edp_api
|
||||
@ -43,6 +40,7 @@ from sahara.utils import api as api_utils
|
||||
from sahara.utils.openstack import cinder
|
||||
from sahara.utils import remote
|
||||
from sahara.utils import rpc as messaging
|
||||
from sahara.utils import wsgi
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -58,7 +56,10 @@ opts = [
|
||||
cfg.StrOpt('remote',
|
||||
default='ssh',
|
||||
help='A method for Sahara to execute commands '
|
||||
'on VMs.')
|
||||
'on VMs.'),
|
||||
cfg.IntOpt('api_workers', default=0,
|
||||
help="Number of workers for Sahara API service (0 means "
|
||||
"all-in-one-thread configuration).")
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -193,9 +194,7 @@ def _get_ops_driver(driver_name):
|
||||
|
||||
|
||||
def start_server(app):
|
||||
sock = eventlet.listen((cfg.CONF.host, cfg.CONF.port), backlog=500)
|
||||
if sslutils.is_enabled():
|
||||
LOG.info(_LI("Using HTTPS for port %s"), cfg.CONF.port)
|
||||
sock = sslutils.wrap(sock)
|
||||
|
||||
wsgi.server(sock, app, log=loggers.WritableLogger(LOG), debug=False)
|
||||
server = wsgi.Server()
|
||||
server.start(app)
|
||||
systemd.notify_once()
|
||||
server.wait()
|
||||
|
105
sahara/openstack/common/systemd.py
Normal file
105
sahara/openstack/common/systemd.py
Normal file
@ -0,0 +1,105 @@
|
||||
# Copyright 2012-2014 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Helper module for systemd service readiness notification.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _abstractify(socket_name):
|
||||
if socket_name.startswith('@'):
|
||||
# abstract namespace socket
|
||||
socket_name = '\0%s' % socket_name[1:]
|
||||
return socket_name
|
||||
|
||||
|
||||
def _sd_notify(unset_env, msg):
|
||||
notify_socket = os.getenv('NOTIFY_SOCKET')
|
||||
if notify_socket:
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
try:
|
||||
sock.connect(_abstractify(notify_socket))
|
||||
sock.sendall(msg)
|
||||
if unset_env:
|
||||
del os.environ['NOTIFY_SOCKET']
|
||||
except EnvironmentError:
|
||||
LOG.debug("Systemd notification failed", exc_info=True)
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
|
||||
def notify():
|
||||
"""Send notification to Systemd that service is ready.
|
||||
|
||||
For details see
|
||||
http://www.freedesktop.org/software/systemd/man/sd_notify.html
|
||||
"""
|
||||
_sd_notify(False, 'READY=1')
|
||||
|
||||
|
||||
def notify_once():
|
||||
"""Send notification once to Systemd that service is ready.
|
||||
|
||||
Systemd sets NOTIFY_SOCKET environment variable with the name of the
|
||||
socket listening for notifications from services.
|
||||
This method removes the NOTIFY_SOCKET environment variable to ensure
|
||||
notification is sent only once.
|
||||
"""
|
||||
_sd_notify(True, 'READY=1')
|
||||
|
||||
|
||||
def onready(notify_socket, timeout):
|
||||
"""Wait for systemd style notification on the socket.
|
||||
|
||||
:param notify_socket: local socket address
|
||||
:type notify_socket: string
|
||||
:param timeout: socket timeout
|
||||
:type timeout: float
|
||||
:returns: 0 service ready
|
||||
1 service not ready
|
||||
2 timeout occurred
|
||||
"""
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock.settimeout(timeout)
|
||||
sock.bind(_abstractify(notify_socket))
|
||||
try:
|
||||
msg = sock.recv(512)
|
||||
except socket.timeout:
|
||||
return 2
|
||||
finally:
|
||||
sock.close()
|
||||
if 'READY=1' in msg:
|
||||
return 0
|
||||
else:
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# simple CLI for testing
|
||||
if len(sys.argv) == 1:
|
||||
notify()
|
||||
elif len(sys.argv) >= 2:
|
||||
timeout = float(sys.argv[1])
|
||||
notify_socket = os.getenv('NOTIFY_SOCKET')
|
||||
if notify_socket:
|
||||
retval = onready(notify_socket, timeout)
|
||||
sys.exit(retval)
|
@ -19,19 +19,30 @@
|
||||
"""Utility methods for working with WSGI servers."""
|
||||
|
||||
import datetime
|
||||
import errno
|
||||
import os
|
||||
import signal
|
||||
from xml.dom import minidom
|
||||
from xml.parsers import expat
|
||||
from xml import sax
|
||||
from xml.sax import expatreader
|
||||
|
||||
import eventlet
|
||||
from eventlet import wsgi
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_log import loggers
|
||||
from oslo_serialization import jsonutils
|
||||
import six
|
||||
|
||||
from sahara import exceptions
|
||||
from sahara.i18n import _
|
||||
from sahara.i18n import _LE
|
||||
from sahara.i18n import _LI
|
||||
from sahara.openstack.common import sslutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class ProtectedExpatParser(expatreader.ExpatParser):
|
||||
@ -311,3 +322,111 @@ class XMLDeserializer(TextDeserializer):
|
||||
|
||||
def default(self, datastring):
|
||||
return {'body': self._from_xml(datastring)}
|
||||
|
||||
|
||||
class Server(object):
|
||||
"""Server class to manage multiple WSGI sockets and applications."""
|
||||
|
||||
def __init__(self, threads=500):
|
||||
self.threads = threads
|
||||
self.children = []
|
||||
self.running = True
|
||||
|
||||
def start(self, application):
|
||||
"""Run a WSGI server with the given application.
|
||||
|
||||
:param application: The application to run in the WSGI server
|
||||
"""
|
||||
def kill_children(*args):
|
||||
"""Kills the entire process group."""
|
||||
LOG.error(_LE('SIGTERM received'))
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
self.running = False
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
|
||||
def hup(*args):
|
||||
"""Shuts down the server(s).
|
||||
|
||||
Shuts down the server(s), but allows running requests to complete
|
||||
"""
|
||||
LOG.error(_LE('SIGHUP received'))
|
||||
signal.signal(signal.SIGHUP, signal.SIG_IGN)
|
||||
os.killpg(0, signal.SIGHUP)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
|
||||
self.application = application
|
||||
self.sock = eventlet.listen((CONF.host, CONF.port), backlog=500)
|
||||
if sslutils.is_enabled():
|
||||
LOG.info(_LI("Using HTTPS for port %s"), CONF.port)
|
||||
self.sock = sslutils.wrap(self.sock)
|
||||
|
||||
if CONF.api_workers == 0:
|
||||
# Useful for profiling, test, debug etc.
|
||||
self.pool = eventlet.GreenPool(size=self.threads)
|
||||
self.pool.spawn_n(self._single_run, application, self.sock)
|
||||
return
|
||||
|
||||
LOG.debug("Starting %d workers", CONF.api_workers)
|
||||
signal.signal(signal.SIGTERM, kill_children)
|
||||
signal.signal(signal.SIGHUP, hup)
|
||||
while len(self.children) < CONF.api_workers:
|
||||
self.run_child()
|
||||
|
||||
def wait_on_children(self):
|
||||
while self.running:
|
||||
try:
|
||||
pid, status = os.wait()
|
||||
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
|
||||
LOG.error(_LE('Removing dead child %s'), pid)
|
||||
self.children.remove(pid)
|
||||
self.run_child()
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
break
|
||||
eventlet.greenio.shutdown_safe(self.sock)
|
||||
self.sock.close()
|
||||
LOG.debug('Server exited')
|
||||
|
||||
def wait(self):
|
||||
"""Wait until all servers have completed running."""
|
||||
try:
|
||||
if self.children:
|
||||
self.wait_on_children()
|
||||
else:
|
||||
self.pool.waitall()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def run_child(self):
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
self.run_server()
|
||||
LOG.debug('Child %d exiting normally', os.getpid())
|
||||
return
|
||||
else:
|
||||
LOG.info(_LI('Started child %s'), pid)
|
||||
self.children.append(pid)
|
||||
|
||||
def run_server(self):
|
||||
"""Run a WSGI server."""
|
||||
self.pool = eventlet.GreenPool(size=self.threads)
|
||||
wsgi.server(self.sock,
|
||||
self.application,
|
||||
custom_pool=self.pool,
|
||||
log=loggers.WritableLogger(LOG),
|
||||
debug=False)
|
||||
self.pool.waitall()
|
||||
|
||||
def _single_run(self, application, sock):
|
||||
"""Start a WSGI server in a new green thread."""
|
||||
LOG.info(_LI("Starting single process server"))
|
||||
eventlet.wsgi.server(sock, application,
|
||||
custom_pool=self.pool,
|
||||
log=loggers.WritableLogger(LOG),
|
||||
debug=False)
|
||||
|
Loading…
Reference in New Issue
Block a user