From 349c140bbd4ac931f5d9946c3df182b00aba47ce Mon Sep 17 00:00:00 2001 From: Andrew Lazarev Date: Mon, 12 Jan 2015 14:18:39 -0800 Subject: [PATCH] Implemented multi-worker solution for Sahara API Implemented temporary solution for wsgi/flask. Should be replaced during migration to Pecan/WSME. Code is based on Heat code. Implements blueprint: sahara-api-workers Change-Id: Ica0ecdc58a1ab345b437e78590a5b29e1d0f3789 --- etc/sahara/sahara.conf.sample | 4 + openstack-common.conf | 1 + sahara/main.py | 21 +++-- sahara/openstack/common/systemd.py | 105 +++++++++++++++++++++++++ sahara/utils/wsgi.py | 119 +++++++++++++++++++++++++++++ 5 files changed, 239 insertions(+), 11 deletions(-) create mode 100644 sahara/openstack/common/systemd.py diff --git a/etc/sahara/sahara.conf.sample b/etc/sahara/sahara.conf.sample index 3c75858fdb..433a19b359 100644 --- a/etc/sahara/sahara.conf.sample +++ b/etc/sahara/sahara.conf.sample @@ -407,6 +407,10 @@ # A method for Sahara to execute commands on VMs. (string value) #remote = ssh +# Number of workers for Sahara API service (0 means all-in-one-thread +# configuration). (integer value) +#api_workers = 0 + # Postfix for storing jobs in hdfs. Will be added to '/user//' path. (string value) #job_workflow_postfix = diff --git a/openstack-common.conf b/openstack-common.conf index c508872a24..5c567df653 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -4,6 +4,7 @@ module=periodic_task module=policy module=sslutils +module=systemd module=threadgroup # The base module to hold the copy of openstack.common diff --git a/sahara/main.py b/sahara/main.py index af08c10a01..14ba1c75ef 100644 --- a/sahara/main.py +++ b/sahara/main.py @@ -15,12 +15,9 @@ import os -import eventlet -from eventlet import wsgi import flask from oslo_config import cfg from oslo_log import log -from oslo_log import loggers import six import stevedore from werkzeug import exceptions as werkzeug_exceptions @@ -33,7 +30,7 @@ from sahara.api import v11 as api_v11 from sahara import config from sahara import context from sahara.i18n import _LI -from sahara.openstack.common import sslutils +from sahara.openstack.common import systemd from sahara.plugins import base as plugins_base from sahara.service import api as service_api from sahara.service.edp import api as edp_api @@ -43,6 +40,7 @@ from sahara.utils import api as api_utils from sahara.utils.openstack import cinder from sahara.utils import remote from sahara.utils import rpc as messaging +from sahara.utils import wsgi LOG = log.getLogger(__name__) @@ -58,7 +56,10 @@ opts = [ cfg.StrOpt('remote', default='ssh', help='A method for Sahara to execute commands ' - 'on VMs.') + 'on VMs.'), + cfg.IntOpt('api_workers', default=0, + help="Number of workers for Sahara API service (0 means " + "all-in-one-thread configuration).") ] CONF = cfg.CONF @@ -193,9 +194,7 @@ def _get_ops_driver(driver_name): def start_server(app): - sock = eventlet.listen((cfg.CONF.host, cfg.CONF.port), backlog=500) - if sslutils.is_enabled(): - LOG.info(_LI("Using HTTPS for port %s"), cfg.CONF.port) - sock = sslutils.wrap(sock) - - wsgi.server(sock, app, log=loggers.WritableLogger(LOG), debug=False) + server = wsgi.Server() + server.start(app) + systemd.notify_once() + server.wait() diff --git a/sahara/openstack/common/systemd.py b/sahara/openstack/common/systemd.py new file mode 100644 index 0000000000..36243b342a --- /dev/null +++ b/sahara/openstack/common/systemd.py @@ -0,0 +1,105 @@ +# Copyright 2012-2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper module for systemd service readiness notification. +""" + +import logging +import os +import socket +import sys + + +LOG = logging.getLogger(__name__) + + +def _abstractify(socket_name): + if socket_name.startswith('@'): + # abstract namespace socket + socket_name = '\0%s' % socket_name[1:] + return socket_name + + +def _sd_notify(unset_env, msg): + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + sock.connect(_abstractify(notify_socket)) + sock.sendall(msg) + if unset_env: + del os.environ['NOTIFY_SOCKET'] + except EnvironmentError: + LOG.debug("Systemd notification failed", exc_info=True) + finally: + sock.close() + + +def notify(): + """Send notification to Systemd that service is ready. + + For details see + http://www.freedesktop.org/software/systemd/man/sd_notify.html + """ + _sd_notify(False, 'READY=1') + + +def notify_once(): + """Send notification once to Systemd that service is ready. + + Systemd sets NOTIFY_SOCKET environment variable with the name of the + socket listening for notifications from services. + This method removes the NOTIFY_SOCKET environment variable to ensure + notification is sent only once. + """ + _sd_notify(True, 'READY=1') + + +def onready(notify_socket, timeout): + """Wait for systemd style notification on the socket. + + :param notify_socket: local socket address + :type notify_socket: string + :param timeout: socket timeout + :type timeout: float + :returns: 0 service ready + 1 service not ready + 2 timeout occurred + """ + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.settimeout(timeout) + sock.bind(_abstractify(notify_socket)) + try: + msg = sock.recv(512) + except socket.timeout: + return 2 + finally: + sock.close() + if 'READY=1' in msg: + return 0 + else: + return 1 + + +if __name__ == '__main__': + # simple CLI for testing + if len(sys.argv) == 1: + notify() + elif len(sys.argv) >= 2: + timeout = float(sys.argv[1]) + notify_socket = os.getenv('NOTIFY_SOCKET') + if notify_socket: + retval = onready(notify_socket, timeout) + sys.exit(retval) diff --git a/sahara/utils/wsgi.py b/sahara/utils/wsgi.py index 795c456f31..994362d4cc 100644 --- a/sahara/utils/wsgi.py +++ b/sahara/utils/wsgi.py @@ -19,19 +19,30 @@ """Utility methods for working with WSGI servers.""" import datetime +import errno +import os +import signal from xml.dom import minidom from xml.parsers import expat from xml import sax from xml.sax import expatreader +import eventlet +from eventlet import wsgi +from oslo_config import cfg from oslo_log import log as logging +from oslo_log import loggers from oslo_serialization import jsonutils import six from sahara import exceptions from sahara.i18n import _ +from sahara.i18n import _LE +from sahara.i18n import _LI +from sahara.openstack.common import sslutils LOG = logging.getLogger(__name__) +CONF = cfg.CONF class ProtectedExpatParser(expatreader.ExpatParser): @@ -311,3 +322,111 @@ class XMLDeserializer(TextDeserializer): def default(self, datastring): return {'body': self._from_xml(datastring)} + + +class Server(object): + """Server class to manage multiple WSGI sockets and applications.""" + + def __init__(self, threads=500): + self.threads = threads + self.children = [] + self.running = True + + def start(self, application): + """Run a WSGI server with the given application. + + :param application: The application to run in the WSGI server + """ + def kill_children(*args): + """Kills the entire process group.""" + LOG.error(_LE('SIGTERM received')) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + self.running = False + os.killpg(0, signal.SIGTERM) + + def hup(*args): + """Shuts down the server(s). + + Shuts down the server(s), but allows running requests to complete + """ + LOG.error(_LE('SIGHUP received')) + signal.signal(signal.SIGHUP, signal.SIG_IGN) + os.killpg(0, signal.SIGHUP) + signal.signal(signal.SIGHUP, hup) + + self.application = application + self.sock = eventlet.listen((CONF.host, CONF.port), backlog=500) + if sslutils.is_enabled(): + LOG.info(_LI("Using HTTPS for port %s"), CONF.port) + self.sock = sslutils.wrap(self.sock) + + if CONF.api_workers == 0: + # Useful for profiling, test, debug etc. + self.pool = eventlet.GreenPool(size=self.threads) + self.pool.spawn_n(self._single_run, application, self.sock) + return + + LOG.debug("Starting %d workers", CONF.api_workers) + signal.signal(signal.SIGTERM, kill_children) + signal.signal(signal.SIGHUP, hup) + while len(self.children) < CONF.api_workers: + self.run_child() + + def wait_on_children(self): + while self.running: + try: + pid, status = os.wait() + if os.WIFEXITED(status) or os.WIFSIGNALED(status): + LOG.error(_LE('Removing dead child %s'), pid) + self.children.remove(pid) + self.run_child() + except OSError as err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + except KeyboardInterrupt: + LOG.info(_LI('Caught keyboard interrupt. Exiting.')) + os.killpg(0, signal.SIGTERM) + break + eventlet.greenio.shutdown_safe(self.sock) + self.sock.close() + LOG.debug('Server exited') + + def wait(self): + """Wait until all servers have completed running.""" + try: + if self.children: + self.wait_on_children() + else: + self.pool.waitall() + except KeyboardInterrupt: + pass + + def run_child(self): + pid = os.fork() + if pid == 0: + signal.signal(signal.SIGHUP, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + self.run_server() + LOG.debug('Child %d exiting normally', os.getpid()) + return + else: + LOG.info(_LI('Started child %s'), pid) + self.children.append(pid) + + def run_server(self): + """Run a WSGI server.""" + self.pool = eventlet.GreenPool(size=self.threads) + wsgi.server(self.sock, + self.application, + custom_pool=self.pool, + log=loggers.WritableLogger(LOG), + debug=False) + self.pool.waitall() + + def _single_run(self, application, sock): + """Start a WSGI server in a new green thread.""" + LOG.info(_LI("Starting single process server")) + eventlet.wsgi.server(sock, application, + custom_pool=self.pool, + log=loggers.WritableLogger(LOG), + debug=False)