Test for shutting down eventlet server on signal
There was no test that verified that when an eventlet-based server was sent a SIGTERM or SIGINT that it would shut down even if there was a connected client. Change-Id: I5e1d008bcf5413307a651620d267836ac493070d Related-Bug: 1446583
This commit is contained in:
168
tests/unit/eventlet_service.py
Normal file
168
tests/unit/eventlet_service.py
Normal file
@@ -0,0 +1,168 @@
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# An eventlet server that runs a service.py pool.
|
||||
|
||||
# Opens listens on a random port. The port # is printed to stdout.
|
||||
|
||||
import socket
|
||||
import sys
|
||||
|
||||
import eventlet.wsgi
|
||||
import greenlet
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from openstack.common import service
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
POOL_SIZE = 1
|
||||
|
||||
|
||||
class Server(object):
|
||||
"""Server class to manage multiple WSGI sockets and applications."""
|
||||
|
||||
def __init__(self, application, host=None, port=None, keepalive=False,
|
||||
keepidle=None):
|
||||
self.application = application
|
||||
self.host = host or '0.0.0.0'
|
||||
self.port = port or 0
|
||||
# Pool for a green thread in which wsgi server will be running
|
||||
self.pool = eventlet.GreenPool(POOL_SIZE)
|
||||
self.socket_info = {}
|
||||
self.greenthread = None
|
||||
self.keepalive = keepalive
|
||||
self.keepidle = keepidle
|
||||
self.socket = None
|
||||
|
||||
def listen(self, key=None, backlog=128):
|
||||
"""Create and start listening on socket.
|
||||
|
||||
Call before forking worker processes.
|
||||
|
||||
Raises Exception if this has already been called.
|
||||
"""
|
||||
|
||||
# TODO(dims): eventlet's green dns/socket module does not actually
|
||||
# support IPv6 in getaddrinfo(). We need to get around this in the
|
||||
# future or monitor upstream for a fix.
|
||||
# Please refer below link
|
||||
# (https://bitbucket.org/eventlet/eventlet/
|
||||
# src/e0f578180d7d82d2ed3d8a96d520103503c524ec/eventlet/support/
|
||||
# greendns.py?at=0.12#cl-163)
|
||||
info = socket.getaddrinfo(self.host,
|
||||
self.port,
|
||||
socket.AF_UNSPEC,
|
||||
socket.SOCK_STREAM)[0]
|
||||
|
||||
self.socket = eventlet.listen(info[-1], family=info[0],
|
||||
backlog=backlog)
|
||||
|
||||
def start(self, key=None, backlog=128):
|
||||
"""Run a WSGI server with the given application."""
|
||||
|
||||
if self.socket is None:
|
||||
self.listen(key=key, backlog=backlog)
|
||||
|
||||
dup_socket = self.socket.dup()
|
||||
if key:
|
||||
self.socket_info[key] = self.socket.getsockname()
|
||||
|
||||
# Optionally enable keepalive on the wsgi socket.
|
||||
if self.keepalive:
|
||||
dup_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
|
||||
if self.keepidle is not None:
|
||||
dup_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
|
||||
self.keepidle)
|
||||
|
||||
self.greenthread = self.pool.spawn(self._run,
|
||||
self.application,
|
||||
dup_socket)
|
||||
|
||||
def stop(self):
|
||||
if self.greenthread is not None:
|
||||
self.greenthread.kill()
|
||||
|
||||
def wait(self):
|
||||
"""Wait until all servers have completed running."""
|
||||
try:
|
||||
self.pool.waitall()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
"""Required by the service interface.
|
||||
|
||||
The service interface is used by the launcher when receiving a
|
||||
SIGHUP. The service interface is defined in
|
||||
openstack.common.service.Service.
|
||||
|
||||
Test server does not need to do anything here.
|
||||
"""
|
||||
pass
|
||||
|
||||
def _run(self, application, socket):
|
||||
"""Start a WSGI server with a new green thread pool."""
|
||||
try:
|
||||
eventlet.wsgi.server(socket, application, debug=False)
|
||||
except greenlet.GreenletExit:
|
||||
# Wait until all servers have completed running
|
||||
pass
|
||||
|
||||
|
||||
class ServerWrapper(object):
|
||||
"""Wraps a Server with some launching info & capabilities."""
|
||||
|
||||
def __init__(self, server, workers):
|
||||
self.server = server
|
||||
self.workers = workers
|
||||
|
||||
def launch_with(self, launcher):
|
||||
self.server.listen()
|
||||
if self.workers > 1:
|
||||
# Use multi-process launcher
|
||||
launcher.launch_service(self.server, self.workers)
|
||||
else:
|
||||
# Use single process launcher
|
||||
launcher.launch_service(self.server)
|
||||
|
||||
|
||||
def run():
|
||||
|
||||
CONF()
|
||||
|
||||
eventlet.patcher.monkey_patch()
|
||||
|
||||
launcher = service.ProcessLauncher()
|
||||
|
||||
def hi_app(environ, start_response):
|
||||
start_response('200 OK', [('Content-Type', 'application/json')])
|
||||
yield 'hi'
|
||||
|
||||
server = ServerWrapper(Server(hi_app), workers=3)
|
||||
server.launch_with(launcher)
|
||||
|
||||
print('%s' % server.server.socket.getsockname()[1])
|
||||
|
||||
sys.stdout.flush()
|
||||
|
||||
launcher.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
||||
@@ -20,12 +20,16 @@ Unit Tests for service class
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import threading
|
||||
|
||||
import errno
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
|
||||
@@ -36,6 +40,7 @@ from mox3 import mox
|
||||
from oslo_config import fixture as config
|
||||
from oslotest import base as test_base
|
||||
from oslotest import moxstubout
|
||||
from six.moves import queue
|
||||
|
||||
from openstack.common import eventlet_backdoor
|
||||
from openstack.common import service
|
||||
@@ -477,3 +482,52 @@ class ServiceTest(test_base.BaseTestCase):
|
||||
# Here we stop ungracefully, and will never see the task finish.
|
||||
self.assertEqual("Timeout!",
|
||||
exercise_graceful_test_service(1, 2, False))
|
||||
|
||||
|
||||
class EventletServerTest(test_base.BaseTestCase):
|
||||
def test_shuts_down_on_sigterm_when_client_connected(self):
|
||||
|
||||
server_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
|
||||
'eventlet_service.py')
|
||||
|
||||
# Start up an eventlet server.
|
||||
server = subprocess.Popen([sys.executable, server_path],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
bufsize=1000,
|
||||
close_fds=True)
|
||||
|
||||
def enqueue_output(f, q):
|
||||
while True:
|
||||
line = f.readline()
|
||||
if not line:
|
||||
break
|
||||
q.put(line)
|
||||
f.close()
|
||||
|
||||
# Start a thread to read stderr so the app doesn't block.
|
||||
err_q = queue.Queue()
|
||||
err_t = threading.Thread(target=enqueue_output,
|
||||
args=(server.stderr, err_q))
|
||||
err_t.daemon = True
|
||||
err_t.start()
|
||||
|
||||
# The server's line of output is the port it picked.
|
||||
port_str = server.stdout.readline()
|
||||
port = int(port_str)
|
||||
|
||||
# connect to the server.
|
||||
conn = socket.create_connection(('127.0.0.1', port))
|
||||
|
||||
# NOTE(blk-u): The sleep shouldn't be necessary. There must be a bug in
|
||||
# the server implementation where it takes some time to set up the
|
||||
# server or signal handlers.
|
||||
time.sleep(1)
|
||||
|
||||
# send SIGTERM to the server and wait for it to exit while client still
|
||||
# connected.
|
||||
server.send_signal(signal.SIGTERM)
|
||||
|
||||
server.wait()
|
||||
|
||||
conn.close()
|
||||
|
||||
Reference in New Issue
Block a user