Merge "Remove eventlet WSGI functionality"
This commit is contained in:
commit
e7965dc323
@ -19,11 +19,11 @@ WSGI middleware for OpenStack API controllers.
|
||||
"""
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import wsgi as base_wsgi
|
||||
import routes
|
||||
|
||||
from cinder.api.openstack import wsgi
|
||||
from cinder.i18n import _, _LW
|
||||
from cinder.wsgi import common as base_wsgi
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -38,9 +38,6 @@ CONF = cfg.CONF
|
||||
logging.register_options(CONF)
|
||||
|
||||
core_opts = [
|
||||
cfg.StrOpt('api_paste_config',
|
||||
default="api-paste.ini",
|
||||
help='File name for the paste.deploy config for cinder-api'),
|
||||
cfg.StrOpt('state_path',
|
||||
default='/var/lib/cinder',
|
||||
deprecated_name='pybasedir',
|
||||
|
@ -192,7 +192,6 @@ def list_opts():
|
||||
cinder_volume_drivers_san_san.san_opts,
|
||||
cinder_volume_drivers_hitachi_hnasnfs.NFS_OPTS,
|
||||
cinder_wsgi_eventletserver.socket_opts,
|
||||
cinder_wsgi_eventletserver.eventlet_opts,
|
||||
cinder_sshutils.ssh_opts,
|
||||
cinder_volume_drivers_netapp_options.netapp_proxy_opts,
|
||||
cinder_volume_drivers_netapp_options.netapp_connection_opts,
|
||||
|
@ -370,7 +370,8 @@ class WSGIService(service.ServiceBase):
|
||||
raise exception.InvalidInput(msg)
|
||||
setup_profiler(name, self.host)
|
||||
|
||||
self.server = wsgi.Server(name,
|
||||
self.server = wsgi.Server(CONF,
|
||||
name,
|
||||
self.app,
|
||||
host=self.host,
|
||||
port=self.port)
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import uuid
|
||||
|
||||
from oslo_service import wsgi
|
||||
from oslo_utils import timeutils
|
||||
import routes
|
||||
import webob
|
||||
@ -29,7 +30,6 @@ from cinder.api.v2 import limits
|
||||
from cinder.api.v2 import router
|
||||
from cinder.api import versions
|
||||
from cinder import context
|
||||
from cinder.wsgi import common as wsgi
|
||||
|
||||
|
||||
FAKE_UUID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
|
||||
|
@ -276,25 +276,25 @@ class TestWSGIService(test.TestCase):
|
||||
self.assertEqual(1000, test_service.server._pool.size)
|
||||
self.assertTrue(mock_load_app.called)
|
||||
|
||||
@mock.patch('cinder.wsgi.eventlet_server.Server')
|
||||
@mock.patch('oslo_service.wsgi.Server')
|
||||
def test_workers_set_default(self, wsgi_server):
|
||||
test_service = service.WSGIService("osapi_volume")
|
||||
self.assertEqual(processutils.get_worker_count(), test_service.workers)
|
||||
|
||||
@mock.patch('cinder.wsgi.eventlet_server.Server')
|
||||
@mock.patch('oslo_service.wsgi.Server')
|
||||
def test_workers_set_good_user_setting(self, wsgi_server):
|
||||
self.override_config('osapi_volume_workers', 8)
|
||||
test_service = service.WSGIService("osapi_volume")
|
||||
self.assertEqual(8, test_service.workers)
|
||||
|
||||
@mock.patch('cinder.wsgi.eventlet_server.Server')
|
||||
@mock.patch('oslo_service.wsgi.Server')
|
||||
def test_workers_set_zero_user_setting(self, wsgi_server):
|
||||
self.override_config('osapi_volume_workers', 0)
|
||||
test_service = service.WSGIService("osapi_volume")
|
||||
# If a value less than 1 is used, defaults to number of procs available
|
||||
self.assertEqual(processutils.get_worker_count(), test_service.workers)
|
||||
|
||||
@mock.patch('cinder.wsgi.eventlet_server.Server')
|
||||
@mock.patch('oslo_service.wsgi.Server')
|
||||
def test_workers_set_negative_user_setting(self, wsgi_server):
|
||||
self.override_config('osapi_volume_workers', -1)
|
||||
self.assertRaises(exception.InvalidInput,
|
||||
|
@ -1,395 +0,0 @@
|
||||
# Copyright 2011 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Unit tests for `cinder.wsgi`."""
|
||||
|
||||
import os.path
|
||||
import re
|
||||
import socket
|
||||
import ssl
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_i18n import fixture as i18n_fixture
|
||||
import six
|
||||
from six.moves import urllib
|
||||
import testtools
|
||||
import webob
|
||||
import webob.dec
|
||||
|
||||
from cinder import exception
|
||||
from cinder.i18n import _
|
||||
from cinder import test
|
||||
from cinder.wsgi import common as wsgi_common
|
||||
from cinder.wsgi import eventlet_server as wsgi
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
'../var'))
|
||||
|
||||
|
||||
def open_no_proxy(*args, **kwargs):
|
||||
# NOTE(coreycb):
|
||||
# Deal with more secure certification chain verficiation
|
||||
# introduced in python 2.7.9 under PEP-0476
|
||||
# https://github.com/python/peps/blob/master/pep-0476.txt
|
||||
if hasattr(ssl, "_create_unverified_context"):
|
||||
context = ssl._create_unverified_context()
|
||||
opener = urllib.request.build_opener(
|
||||
urllib.request.ProxyHandler({}),
|
||||
urllib.request.HTTPSHandler(context=context)
|
||||
)
|
||||
else:
|
||||
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
|
||||
return opener.open(*args, **kwargs)
|
||||
|
||||
|
||||
class TestLoaderNothingExists(test.TestCase):
|
||||
"""Loader tests where os.path.exists always returns False."""
|
||||
|
||||
def setUp(self):
|
||||
super(TestLoaderNothingExists, self).setUp()
|
||||
self.stubs.Set(os.path, 'exists', lambda _: False)
|
||||
|
||||
def test_config_not_found(self):
|
||||
self.assertRaises(
|
||||
exception.ConfigNotFound,
|
||||
wsgi_common.Loader,
|
||||
)
|
||||
|
||||
|
||||
class TestLoaderNormalFilesystem(test.TestCase):
|
||||
"""Loader tests with normal filesystem (unmodified os.path module)."""
|
||||
|
||||
_paste_config = """
|
||||
[app:test_app]
|
||||
use = egg:Paste#static
|
||||
document_root = /tmp
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestLoaderNormalFilesystem, self).setUp()
|
||||
self.config = tempfile.NamedTemporaryFile(mode="w+t")
|
||||
self.config.write(self._paste_config.lstrip())
|
||||
self.config.seek(0)
|
||||
self.config.flush()
|
||||
self.loader = wsgi_common.Loader(self.config.name)
|
||||
self.addCleanup(self.config.close)
|
||||
|
||||
def test_config_found(self):
|
||||
self.assertEqual(self.config.name, self.loader.config_path)
|
||||
|
||||
def test_app_not_found(self):
|
||||
self.assertRaises(
|
||||
exception.PasteAppNotFound,
|
||||
self.loader.load_app,
|
||||
"non-existent app",
|
||||
)
|
||||
|
||||
def test_app_found(self):
|
||||
url_parser = self.loader.load_app("test_app")
|
||||
self.assertEqual("/tmp", url_parser.directory)
|
||||
|
||||
|
||||
class TestWSGIServer(test.TestCase):
|
||||
"""WSGI server tests."""
|
||||
def _ipv6_configured():
|
||||
try:
|
||||
with open('/proc/net/if_inet6') as f:
|
||||
return len(f.read()) > 0
|
||||
except IOError:
|
||||
return False
|
||||
|
||||
def test_no_app(self):
|
||||
server = wsgi.Server("test_app", None,
|
||||
host="127.0.0.1", port=0)
|
||||
self.assertEqual("test_app", server.name)
|
||||
|
||||
def test_start_random_port(self):
|
||||
server = wsgi.Server("test_random_port", None, host="127.0.0.1")
|
||||
server.start()
|
||||
self.assertNotEqual(0, server.port)
|
||||
server.stop()
|
||||
server.wait()
|
||||
|
||||
@testtools.skipIf(not _ipv6_configured(),
|
||||
"Test requires an IPV6 configured interface")
|
||||
def test_start_random_port_with_ipv6(self):
|
||||
server = wsgi.Server("test_random_port",
|
||||
None,
|
||||
host="::1")
|
||||
server.start()
|
||||
self.assertEqual("::1", server.host)
|
||||
self.assertNotEqual(0, server.port)
|
||||
server.stop()
|
||||
server.wait()
|
||||
|
||||
def test_server_pool_waitall(self):
|
||||
# test pools waitall method gets called while stopping server
|
||||
server = wsgi.Server("test_server", None,
|
||||
host="127.0.0.1")
|
||||
server.start()
|
||||
with mock.patch.object(server._pool,
|
||||
'waitall') as mock_waitall:
|
||||
server.stop()
|
||||
server.wait()
|
||||
mock_waitall.assert_called_once_with()
|
||||
|
||||
def test_app(self):
|
||||
greetings = b'Hello, World!!!'
|
||||
|
||||
def hello_world(env, start_response):
|
||||
if env['PATH_INFO'] != '/':
|
||||
start_response('404 Not Found',
|
||||
[('Content-Type', 'text/plain')])
|
||||
return ['Not Found\r\n']
|
||||
start_response('200 OK', [('Content-Type', 'text/plain')])
|
||||
return [greetings]
|
||||
|
||||
server = wsgi.Server("test_app", hello_world,
|
||||
host="127.0.0.1", port=0)
|
||||
server.start()
|
||||
|
||||
response = open_no_proxy('http://127.0.0.1:%d/' % server.port)
|
||||
self.assertEqual(greetings, response.read())
|
||||
server.stop()
|
||||
|
||||
def test_client_socket_timeout(self):
|
||||
CONF.set_default("client_socket_timeout", 0.1)
|
||||
greetings = b'Hello, World!!!'
|
||||
|
||||
def hello_world(env, start_response):
|
||||
start_response('200 OK', [('Content-Type', 'text/plain')])
|
||||
return [greetings]
|
||||
|
||||
server = wsgi.Server("test_app", hello_world,
|
||||
host="127.0.0.1", port=0)
|
||||
server.start()
|
||||
|
||||
s = socket.socket()
|
||||
s.connect(("127.0.0.1", server.port))
|
||||
|
||||
fd = s.makefile('rwb')
|
||||
fd.write(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
|
||||
fd.flush()
|
||||
|
||||
buf = fd.read()
|
||||
self.assertTrue(re.search(greetings, buf))
|
||||
|
||||
s2 = socket.socket()
|
||||
s2.connect(("127.0.0.1", server.port))
|
||||
time.sleep(0.2)
|
||||
|
||||
fd = s2.makefile('rwb')
|
||||
fd.write(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
|
||||
fd.flush()
|
||||
|
||||
buf = fd.read()
|
||||
# connection is closed so we get nothing from the server
|
||||
self.assertFalse(buf)
|
||||
server.stop()
|
||||
|
||||
@testtools.skipIf(six.PY3, "bug/1505103: test hangs on Python 3")
|
||||
def test_app_using_ssl(self):
|
||||
CONF.set_default("ssl_cert_file",
|
||||
os.path.join(TEST_VAR_DIR, 'certificate.crt'))
|
||||
CONF.set_default("ssl_key_file",
|
||||
os.path.join(TEST_VAR_DIR, 'privatekey.key'))
|
||||
|
||||
greetings = 'Hello, World!!!'
|
||||
|
||||
@webob.dec.wsgify
|
||||
def hello_world(req):
|
||||
return greetings
|
||||
|
||||
server = wsgi.Server("test_app", hello_world,
|
||||
host="127.0.0.1", port=0)
|
||||
|
||||
server.start()
|
||||
|
||||
response = open_no_proxy('https://127.0.0.1:%d/' % server.port)
|
||||
self.assertEqual(greetings, response.read())
|
||||
|
||||
server.stop()
|
||||
|
||||
@testtools.skipIf(not _ipv6_configured(),
|
||||
"Test requires an IPV6 configured interface")
|
||||
@testtools.skipIf(six.PY3, "bug/1505103: test hangs on Python 3")
|
||||
def test_app_using_ipv6_and_ssl(self):
|
||||
CONF.set_default("ssl_cert_file",
|
||||
os.path.join(TEST_VAR_DIR, 'certificate.crt'))
|
||||
CONF.set_default("ssl_key_file",
|
||||
os.path.join(TEST_VAR_DIR, 'privatekey.key'))
|
||||
|
||||
greetings = 'Hello, World!!!'
|
||||
|
||||
@webob.dec.wsgify
|
||||
def hello_world(req):
|
||||
return greetings
|
||||
|
||||
server = wsgi.Server("test_app",
|
||||
hello_world,
|
||||
host="::1",
|
||||
port=0)
|
||||
server.start()
|
||||
|
||||
response = open_no_proxy('https://[::1]:%d/' % server.port)
|
||||
self.assertEqual(greetings, response.read())
|
||||
|
||||
server.stop()
|
||||
|
||||
def test_reset_pool_size_to_default(self):
|
||||
server = wsgi.Server("test_resize", None, host="127.0.0.1")
|
||||
server.start()
|
||||
|
||||
# Stopping the server, which in turn sets pool size to 0
|
||||
server.stop()
|
||||
self.assertEqual(0, server._pool.size)
|
||||
|
||||
# Resetting pool size to default
|
||||
server.reset()
|
||||
server.start()
|
||||
self.assertEqual(1000, server._pool.size)
|
||||
|
||||
|
||||
class ExceptionTest(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ExceptionTest, self).setUp()
|
||||
self.useFixture(i18n_fixture.ToggleLazy(True))
|
||||
|
||||
def _wsgi_app(self, inner_app):
|
||||
# NOTE(luisg): In order to test localization, we need to
|
||||
# make sure the lazy _() is installed in the 'fault' module
|
||||
# also we don't want to install the _() system-wide and
|
||||
# potentially break other test cases, so we do it here for this
|
||||
# test suite only.
|
||||
from cinder.api.middleware import fault
|
||||
return fault.FaultWrapper(inner_app)
|
||||
|
||||
def _do_test_exception_safety_reflected_in_faults(self, expose):
|
||||
class ExceptionWithSafety(exception.CinderException):
|
||||
safe = expose
|
||||
|
||||
@webob.dec.wsgify
|
||||
def fail(req):
|
||||
raise ExceptionWithSafety('some explanation')
|
||||
|
||||
api = self._wsgi_app(fail)
|
||||
resp = webob.Request.blank('/').get_response(api)
|
||||
self.assertIn(b'{"computeFault', resp.body)
|
||||
expected = (b'ExceptionWithSafety: some explanation' if expose else
|
||||
b'The server has either erred or is incapable '
|
||||
b'of performing the requested operation.')
|
||||
self.assertIn(expected, resp.body)
|
||||
self.assertEqual(500, resp.status_int, resp.body)
|
||||
|
||||
def test_safe_exceptions_are_described_in_faults(self):
|
||||
self._do_test_exception_safety_reflected_in_faults(True)
|
||||
|
||||
def test_unsafe_exceptions_are_not_described_in_faults(self):
|
||||
self._do_test_exception_safety_reflected_in_faults(False)
|
||||
|
||||
def _do_test_exception_mapping(self, exception_type, msg):
|
||||
@webob.dec.wsgify
|
||||
def fail(req):
|
||||
raise exception_type(msg)
|
||||
|
||||
api = self._wsgi_app(fail)
|
||||
resp = webob.Request.blank('/').get_response(api)
|
||||
msg_body = (msg.encode('utf-8') if isinstance(msg, six.text_type)
|
||||
else msg)
|
||||
self.assertIn(msg_body, resp.body)
|
||||
self.assertEqual(exception_type.code, resp.status_int, resp.body)
|
||||
|
||||
if hasattr(exception_type, 'headers'):
|
||||
for (key, value) in exception_type.headers.items():
|
||||
self.assertIn(key, resp.headers)
|
||||
self.assertEqual(resp.headers[key], value)
|
||||
|
||||
def test_quota_error_mapping(self):
|
||||
self._do_test_exception_mapping(exception.QuotaError, 'too many used')
|
||||
|
||||
def test_non_cinder_notfound_exception_mapping(self):
|
||||
class ExceptionWithCode(Exception):
|
||||
code = 404
|
||||
|
||||
self._do_test_exception_mapping(ExceptionWithCode,
|
||||
'NotFound')
|
||||
|
||||
def test_non_cinder_exception_mapping(self):
|
||||
class ExceptionWithCode(Exception):
|
||||
code = 417
|
||||
|
||||
self._do_test_exception_mapping(ExceptionWithCode,
|
||||
'Expectation failed')
|
||||
|
||||
def test_exception_with_none_code_throws_500(self):
|
||||
class ExceptionWithNoneCode(Exception):
|
||||
code = None
|
||||
|
||||
@webob.dec.wsgify
|
||||
def fail(req):
|
||||
raise ExceptionWithNoneCode()
|
||||
|
||||
api = self._wsgi_app(fail)
|
||||
resp = webob.Request.blank('/').get_response(api)
|
||||
self.assertEqual(500, resp.status_int)
|
||||
|
||||
@mock.patch('cinder.i18n.translate')
|
||||
def test_cinder_exception_with_localized_explanation(self, mock_t9n):
|
||||
msg = 'My Not Found'
|
||||
msg_translation = 'Mi No Encontrado'
|
||||
message = _(msg) # noqa
|
||||
|
||||
@webob.dec.wsgify
|
||||
def fail(req):
|
||||
class MyVolumeNotFound(exception.NotFound):
|
||||
def __init__(self):
|
||||
self.msg = message
|
||||
self.safe = True
|
||||
raise MyVolumeNotFound()
|
||||
|
||||
# Test response without localization
|
||||
def mock_get_non_localized_message(msgid, locale):
|
||||
return msg
|
||||
|
||||
mock_t9n.side_effect = mock_get_non_localized_message
|
||||
|
||||
api = self._wsgi_app(fail)
|
||||
resp = webob.Request.blank('/').get_response(api)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
msg_body = (msg.encode('utf-8') if isinstance(msg, six.text_type)
|
||||
else msg)
|
||||
self.assertIn(msg_body, resp.body)
|
||||
|
||||
# Test response with localization
|
||||
def mock_translate(msgid, locale):
|
||||
return msg_translation
|
||||
|
||||
mock_t9n.side_effect = mock_translate
|
||||
|
||||
api = self._wsgi_app(fail)
|
||||
resp = webob.Request.blank('/').get_response(api)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
if isinstance(msg_translation, six.text_type):
|
||||
msg_body = msg_translation.encode('utf-8')
|
||||
else:
|
||||
msg_body = msg_translation
|
||||
self.assertIn(msg_body, resp.body)
|
@ -1,70 +0,0 @@
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# Copyright 2010 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Test WSGI basics and provide some helper functions for other WSGI tests.
|
||||
"""
|
||||
|
||||
import mock
|
||||
import six
|
||||
|
||||
from cinder import test
|
||||
|
||||
import routes
|
||||
import webob
|
||||
|
||||
from cinder.wsgi import common as wsgi
|
||||
|
||||
|
||||
class Test(test.TestCase):
|
||||
|
||||
def test_debug(self):
|
||||
|
||||
class Application(wsgi.Application):
|
||||
"""Dummy application to test debug."""
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
start_response("200", [("X-Test", "checking")])
|
||||
return [b'Test result']
|
||||
|
||||
with mock.patch('sys.stdout', new=six.StringIO()) as mock_stdout:
|
||||
mock_stdout.buffer = six.BytesIO()
|
||||
application = wsgi.Debug(Application())
|
||||
result = webob.Request.blank('/').get_response(application)
|
||||
self.assertEqual(b"Test result", result.body)
|
||||
|
||||
def test_router(self):
|
||||
|
||||
class Application(wsgi.Application):
|
||||
"""Test application to call from router."""
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
start_response("200", [])
|
||||
return [b'Router result']
|
||||
|
||||
class Router(wsgi.Router):
|
||||
"""Test router."""
|
||||
|
||||
def __init__(self):
|
||||
mapper = routes.Mapper()
|
||||
mapper.connect("/test", controller=Application())
|
||||
super(Router, self).__init__(mapper)
|
||||
|
||||
result = webob.Request.blank('/test').get_response(Router())
|
||||
self.assertEqual(b"Router result", result.body)
|
||||
result = webob.Request.blank('/bad').get_response(Router())
|
||||
self.assertNotEqual(b"Router result", result.body)
|
@ -14,13 +14,10 @@
|
||||
|
||||
"""Utility methods for working with WSGI servers."""
|
||||
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import wsgi
|
||||
from paste import deploy
|
||||
import routes.middleware
|
||||
import six
|
||||
import webob.dec
|
||||
import webob.exc
|
||||
|
||||
@ -164,104 +161,6 @@ class Middleware(Application):
|
||||
return self.process_response(response)
|
||||
|
||||
|
||||
class Debug(Middleware):
|
||||
"""Helper class for debugging a WSGI application.
|
||||
|
||||
Can be inserted into any WSGI application chain to get information
|
||||
about the request and response.
|
||||
|
||||
"""
|
||||
|
||||
@webob.dec.wsgify(RequestClass=Request)
|
||||
def __call__(self, req):
|
||||
print(('*' * 40) + ' REQUEST ENVIRON') # noqa
|
||||
for key, value in req.environ.items():
|
||||
print(key, '=', value) # noqa
|
||||
print() # noqa
|
||||
resp = req.get_response(self.application)
|
||||
|
||||
print(('*' * 40) + ' RESPONSE HEADERS') # noqa
|
||||
for (key, value) in resp.headers.items():
|
||||
print(key, '=', value) # noqa
|
||||
print() # noqa
|
||||
|
||||
resp.app_iter = self.print_generator(resp.app_iter)
|
||||
|
||||
return resp
|
||||
|
||||
@staticmethod
|
||||
def print_generator(app_iter):
|
||||
"""Iterator that prints the contents of a wrapper string."""
|
||||
print(('*' * 40) + ' BODY') # noqa
|
||||
for part in app_iter:
|
||||
if six.PY3:
|
||||
sys.stdout.flush()
|
||||
sys.stdout.buffer.write(part) # pylint: disable=E1101
|
||||
sys.stdout.buffer.flush() # pylint: disable=E1101
|
||||
else:
|
||||
sys.stdout.write(part)
|
||||
sys.stdout.flush()
|
||||
yield part
|
||||
print() # noqa
|
||||
|
||||
|
||||
class Router(object):
|
||||
"""WSGI middleware that maps incoming requests to WSGI apps."""
|
||||
|
||||
def __init__(self, mapper):
|
||||
"""Create a router for the given routes.Mapper.
|
||||
|
||||
Each route in `mapper` must specify a 'controller', which is a
|
||||
WSGI app to call. You'll probably want to specify an 'action' as
|
||||
well and have your controller be an object that can route
|
||||
the request to the action-specific method.
|
||||
|
||||
Examples:
|
||||
mapper = routes.Mapper()
|
||||
sc = ServerController()
|
||||
|
||||
# Explicit mapping of one route to a controller+action
|
||||
mapper.connect(None, '/svrlist', controller=sc, action='list')
|
||||
|
||||
# Actions are all implicitly defined
|
||||
mapper.resource('server', 'servers', controller=sc)
|
||||
|
||||
# Pointing to an arbitrary WSGI app. You can specify the
|
||||
# {path_info:.*} parameter so the target app can be handed just that
|
||||
# section of the URL.
|
||||
mapper.connect(None, '/v1.0/{path_info:.*}', controller=BlogApp())
|
||||
|
||||
"""
|
||||
self.map = mapper
|
||||
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
|
||||
self.map)
|
||||
|
||||
@webob.dec.wsgify(RequestClass=Request)
|
||||
def __call__(self, req):
|
||||
"""Route the incoming request to a controller based on self.map.
|
||||
|
||||
If no match, return a 404.
|
||||
|
||||
"""
|
||||
return self._router
|
||||
|
||||
@staticmethod
|
||||
@webob.dec.wsgify(RequestClass=Request)
|
||||
def _dispatch(req):
|
||||
"""Dispatch the request to the appropriate controller.
|
||||
|
||||
Called by self._router after matching the incoming request to a route
|
||||
and putting the information into req.environ. Either returns 404
|
||||
or the routed WSGI app's response.
|
||||
|
||||
"""
|
||||
match = req.environ['wsgiorg.routing_args'][1]
|
||||
if not match:
|
||||
return webob.exc.HTTPNotFound()
|
||||
app = match['controller']
|
||||
return app
|
||||
|
||||
|
||||
class Loader(object):
|
||||
"""Used to load WSGI applications from paste configurations."""
|
||||
|
||||
@ -272,6 +171,7 @@ class Loader(object):
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
wsgi.register_opts(CONF) # noqa
|
||||
config_path = config_path or CONF.api_paste_config
|
||||
self.config_path = utils.find_config(config_path)
|
||||
|
||||
|
@ -14,269 +14,47 @@
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import errno
|
||||
import os
|
||||
import socket
|
||||
import ssl
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
import eventlet.wsgi
|
||||
import greenlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import service
|
||||
from oslo_utils import excutils
|
||||
from oslo_service import wsgi
|
||||
from oslo_utils import netutils
|
||||
|
||||
|
||||
from cinder import exception
|
||||
from cinder.i18n import _, _LE, _LI
|
||||
|
||||
|
||||
socket_opts = [
|
||||
cfg.BoolOpt('tcp_keepalive',
|
||||
default=True,
|
||||
help="Sets the value of TCP_KEEPALIVE (True/False) for each "
|
||||
"server socket."),
|
||||
cfg.IntOpt('tcp_keepidle',
|
||||
default=600,
|
||||
help="Sets the value of TCP_KEEPIDLE in seconds for each "
|
||||
"server socket. Not supported on OS X."),
|
||||
cfg.IntOpt('tcp_keepalive_interval',
|
||||
help="Sets the value of TCP_KEEPINTVL in seconds for each "
|
||||
"server socket. Not supported on OS X."),
|
||||
cfg.IntOpt('tcp_keepalive_count',
|
||||
help="Sets the value of TCP_KEEPCNT for each "
|
||||
"server socket. Not supported on OS X."),
|
||||
cfg.StrOpt('ssl_ca_file',
|
||||
help="CA certificate file to use to verify "
|
||||
"connecting clients"),
|
||||
cfg.StrOpt('ssl_cert_file',
|
||||
help="Certificate file to use when starting "
|
||||
"the server securely"),
|
||||
cfg.StrOpt('ssl_key_file',
|
||||
help="Private key file to use when starting "
|
||||
"the server securely"),
|
||||
]
|
||||
|
||||
eventlet_opts = [
|
||||
cfg.IntOpt('max_header_line',
|
||||
default=16384,
|
||||
help="Maximum line size of message headers to be accepted. "
|
||||
"max_header_line may need to be increased when using "
|
||||
"large tokens (typically those generated by the "
|
||||
"Keystone v3 API with big service catalogs)."),
|
||||
cfg.IntOpt('client_socket_timeout', default=900,
|
||||
help="Timeout for client connections\' socket operations. "
|
||||
"If an incoming connection is idle for this number of "
|
||||
"seconds it will be closed. A value of \'0\' means "
|
||||
"wait forever."),
|
||||
cfg.BoolOpt('wsgi_keep_alive',
|
||||
default=True,
|
||||
help='If False, closes the client socket connection '
|
||||
'explicitly. Setting it to True to maintain backward '
|
||||
'compatibility. Recommended setting is set it to False.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(socket_opts)
|
||||
CONF.register_opts(eventlet_opts)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Server(service.ServiceBase):
|
||||
class Server(wsgi.Server):
|
||||
"""Server class to manage a WSGI server, serving a WSGI application."""
|
||||
|
||||
default_pool_size = 1000
|
||||
|
||||
def __init__(self, name, app, host=None, port=None, pool_size=None,
|
||||
protocol=eventlet.wsgi.HttpProtocol, backlog=128):
|
||||
"""Initialize, but do not start, a WSGI server.
|
||||
|
||||
:param name: Pretty name for logging.
|
||||
:param app: The WSGI application to serve.
|
||||
:param host: IP address to serve the application.
|
||||
:param port: Port number to server the application.
|
||||
:param pool_size: Maximum number of eventlets to spawn concurrently.
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
# Allow operators to customize http requests max header line size.
|
||||
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
|
||||
self.client_socket_timeout = CONF.client_socket_timeout or None
|
||||
self.name = name
|
||||
self.app = app
|
||||
self._host = host or "0.0.0.0"
|
||||
self._port = port or 0
|
||||
self._server = None
|
||||
self._socket = None
|
||||
self._protocol = protocol
|
||||
self.pool_size = pool_size or self.default_pool_size
|
||||
self._pool = eventlet.GreenPool(self.pool_size)
|
||||
self._logger = logging.getLogger("eventlet.wsgi.server")
|
||||
|
||||
if backlog < 1:
|
||||
raise exception.InvalidInput(
|
||||
reason='The backlog must be more than 1')
|
||||
|
||||
bind_addr = (host, port)
|
||||
# TODO(dims): eventlet's green dns/socket module does not actually
|
||||
# support IPv6 in getaddrinfo(). We need to get around this in the
|
||||
# future or monitor upstream for a fix
|
||||
try:
|
||||
info = socket.getaddrinfo(bind_addr[0],
|
||||
bind_addr[1],
|
||||
socket.AF_UNSPEC,
|
||||
socket.SOCK_STREAM)[0]
|
||||
family = info[0]
|
||||
bind_addr = info[-1]
|
||||
except Exception:
|
||||
family = socket.AF_INET
|
||||
|
||||
cert_file = CONF.ssl_cert_file
|
||||
key_file = CONF.ssl_key_file
|
||||
ca_file = CONF.ssl_ca_file
|
||||
self._use_ssl = cert_file or key_file
|
||||
|
||||
if cert_file and not os.path.exists(cert_file):
|
||||
raise RuntimeError(_("Unable to find cert_file : %s")
|
||||
% cert_file)
|
||||
|
||||
if ca_file and not os.path.exists(ca_file):
|
||||
raise RuntimeError(_("Unable to find ca_file : %s") % ca_file)
|
||||
|
||||
if key_file and not os.path.exists(key_file):
|
||||
raise RuntimeError(_("Unable to find key_file : %s")
|
||||
% key_file)
|
||||
|
||||
if self._use_ssl and (not cert_file or not key_file):
|
||||
raise RuntimeError(_("When running server in SSL mode, you "
|
||||
"must specify both a cert_file and "
|
||||
"key_file option value in your "
|
||||
"configuration file."))
|
||||
|
||||
retry_until = time.time() + 30
|
||||
while not self._socket and time.time() < retry_until:
|
||||
try:
|
||||
self._socket = eventlet.listen(bind_addr, backlog=backlog,
|
||||
family=family)
|
||||
except socket.error as err:
|
||||
if err.args[0] != errno.EADDRINUSE:
|
||||
raise
|
||||
eventlet.sleep(0.1)
|
||||
|
||||
if not self._socket:
|
||||
raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
|
||||
"after trying for 30 seconds") %
|
||||
{'host': host, 'port': port})
|
||||
|
||||
(self._host, self._port) = self._socket.getsockname()[0:2]
|
||||
LOG.info(_LI("%(name)s listening on %(_host)s:%(_port)s"),
|
||||
{'name': self.name, '_host': self._host, '_port': self._port})
|
||||
|
||||
def start(self):
|
||||
"""Start serving a WSGI application.
|
||||
|
||||
:returns: None
|
||||
:raises: cinder.exception.InvalidInput
|
||||
|
||||
"""
|
||||
# The server socket object will be closed after server exits,
|
||||
# but the underlying file descriptor will remain open, and will
|
||||
# give bad file descriptor error. So duplicating the socket object,
|
||||
# to keep file descriptor usable.
|
||||
|
||||
dup_socket = self._socket.dup()
|
||||
dup_socket.setsockopt(socket.SOL_SOCKET,
|
||||
socket.SO_REUSEADDR, 1)
|
||||
def _set_socket_opts(self, _socket):
|
||||
_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
# NOTE(praneshp): Call set_tcp_keepalive in oslo to set
|
||||
# tcp keepalive parameters. Sockets can hang around forever
|
||||
# without keepalive
|
||||
netutils.set_tcp_keepalive(dup_socket,
|
||||
CONF.tcp_keepalive,
|
||||
CONF.tcp_keepidle,
|
||||
CONF.tcp_keepalive_count,
|
||||
CONF.tcp_keepalive_interval)
|
||||
netutils.set_tcp_keepalive(_socket,
|
||||
self.conf.tcp_keepalive,
|
||||
self.conf.tcp_keepidle,
|
||||
self.conf.tcp_keepalive_count,
|
||||
self.conf.tcp_keepalive_interval)
|
||||
|
||||
if self._use_ssl:
|
||||
try:
|
||||
ssl_kwargs = {
|
||||
'server_side': True,
|
||||
'certfile': CONF.ssl_cert_file,
|
||||
'keyfile': CONF.ssl_key_file,
|
||||
'cert_reqs': ssl.CERT_NONE,
|
||||
}
|
||||
|
||||
if CONF.ssl_ca_file:
|
||||
ssl_kwargs['ca_certs'] = CONF.ssl_ca_file
|
||||
ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED
|
||||
|
||||
dup_socket = ssl.wrap_socket(dup_socket,
|
||||
**ssl_kwargs)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to start %(name)s on %(_host)s: "
|
||||
"%(_port)s with SSL "
|
||||
"support."), self.__dict__)
|
||||
|
||||
wsgi_kwargs = {
|
||||
'func': eventlet.wsgi.server,
|
||||
'sock': dup_socket,
|
||||
'site': self.app,
|
||||
'protocol': self._protocol,
|
||||
'custom_pool': self._pool,
|
||||
'log': self._logger,
|
||||
'socket_timeout': self.client_socket_timeout,
|
||||
'keepalive': CONF.wsgi_keep_alive
|
||||
}
|
||||
|
||||
self._server = eventlet.spawn(**wsgi_kwargs)
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
return self._host
|
||||
|
||||
@property
|
||||
def port(self):
|
||||
return self._port
|
||||
|
||||
def stop(self):
|
||||
"""Stop this server.
|
||||
|
||||
This is not a very nice action, as currently the method by which a
|
||||
server is stopped is by killing its eventlet.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
LOG.info(_LI("Stopping WSGI server."))
|
||||
if self._server is not None:
|
||||
# Resize pool to stop new requests from being processed
|
||||
self._pool.resize(0)
|
||||
self._server.kill()
|
||||
|
||||
def wait(self):
|
||||
"""Block, until the server has stopped.
|
||||
|
||||
Waits on the server's eventlet to finish, then returns.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
try:
|
||||
if self._server is not None:
|
||||
self._pool.waitall()
|
||||
self._server.wait()
|
||||
except greenlet.GreenletExit:
|
||||
LOG.info(_LI("WSGI server has stopped."))
|
||||
|
||||
def reset(self):
|
||||
"""Reset server greenpool size to default.
|
||||
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
self._pool.resize(self.pool_size)
|
||||
return _socket
|
||||
|
Loading…
x
Reference in New Issue
Block a user