Merge "[eventlet-removal] Remove the eventlet WSGI Neutron server code"

This commit is contained in:
Zuul
2025-07-01 16:28:45 +00:00
committed by Gerrit Code Review
9 changed files with 8 additions and 535 deletions

View File

@@ -175,6 +175,14 @@ synchronizer classes (``OvnNbSynchronizer``, ``OvnSbSynchronizer``).
synchronization. synchronization.
Removals and deprecations
-------------------------
The ``api.wsgi.Server`` class, based on ``eventlet``, used in the Neutron API
and the Metadata server to handle multiple WSGI sockets, is removed from the
repository.
References References
---------- ----------

View File

@@ -16,22 +16,14 @@
""" """
Utility methods for working with WSGI servers Utility methods for working with WSGI servers
""" """
import errno
import socket
import sys
import time
import eventlet.wsgi import eventlet.wsgi
from neutron_lib import context from neutron_lib import context
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exception from neutron_lib import exceptions as exception
from oslo_config import cfg from oslo_config import cfg
import oslo_i18n import oslo_i18n
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_service import service as common_service
from oslo_service import sslutils from oslo_service import sslutils
from oslo_service import systemd
from oslo_service import wsgi from oslo_service import wsgi
from oslo_utils import encodeutils from oslo_utils import encodeutils
from oslo_utils import excutils from oslo_utils import excutils
@@ -99,142 +91,6 @@ class WorkerService(neutron_worker.NeutronBaseWorker):
config.reset_service() config.reset_service()
class Server:
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, name, num_threads=None, disable_ssl=False):
# Raise the default from 8192 to accommodate large tokens
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.num_threads = num_threads or CONF.wsgi_default_pool_size
self.disable_ssl = disable_ssl
# Pool for a greenthread in which wsgi server will be running
self.pool = eventlet.GreenPool(1)
self.name = name
self._server = None
# A value of 0 is converted to None because None is what causes the
# wsgi server to wait forever.
self.client_socket_timeout = CONF.client_socket_timeout or None
if CONF.use_ssl and not self.disable_ssl:
sslutils.is_enabled(CONF)
def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
try:
info = socket.getaddrinfo(bind_addr[0],
bind_addr[1],
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
family = info[0]
bind_addr = info[-1]
except Exception:
LOG.exception("Unable to listen on %(host)s:%(port)s",
{'host': host, 'port': port})
sys.exit(1)
sock = None
retry_until = time.time() + CONF.retry_until_window
while not sock and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
except OSError as err:
with excutils.save_and_reraise_exception() as ctxt:
if err.errno == errno.EADDRINUSE:
ctxt.reraise = False
eventlet.sleep(0.1)
if not sock:
raise RuntimeError(_("Could not bind to %(host)s:%(port)s "
"after trying for %(time)d seconds") %
{'host': host,
'port': port,
'time': CONF.retry_until_window})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
CONF.tcp_keepidle)
return sock
def start(self, application, port, host='0.0.0.0', workers=0, desc=None):
"""Run a WSGI server with the given application."""
self._host = host
self._port = port
backlog = CONF.backlog
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._launch(application, workers, desc)
def _launch(self, application, workers=0, desc=None):
set_proctitle = "off" if desc is None else CONF.setproctitle
service = WorkerService(self, application, set_proctitle,
self.disable_ssl, workers, desc)
if workers < 1:
# The API service should run in the current process.
self._server = service
# Dump the initial option values
cfg.CONF.log_opt_values(LOG, logging.DEBUG)
service.start(desc=desc)
systemd.notify_once()
else:
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
db_api.get_context_manager().dispose_pool()
# The API service runs in a number of child processes.
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
self._server = common_service.ProcessLauncher(
cfg.CONF, wait_interval=1.0, restart_method='mutate')
self._server.launch_service(service,
workers=service.worker_process_count)
@property
def host(self):
return self._socket.getsockname()[0] if self._socket else self._host
@property
def port(self):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
self._server.stop()
def wait(self):
"""Wait until all servers have completed running."""
try:
self._server.wait()
except KeyboardInterrupt:
pass
def _run(self, application, socket):
"""Start a WSGI server in a new green thread."""
eventlet.wsgi.server(socket, application,
max_size=self.num_threads,
log=LOG,
keepalive=CONF.wsgi_keep_alive,
log_format=CONF.wsgi_log_format,
socket_timeout=self.client_socket_timeout,
debug=CONF.wsgi_server_debug)
@property
def process_launcher(self):
if isinstance(self._server, common_service.ProcessLauncher):
return self._server
return None
class Request(wsgi.Request): class Request(wsgi.Request):
def best_match_content_type(self): def best_match_content_type(self):

View File

@@ -1,18 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron import server
from neutron.server import wsgi_eventlet
def main():
server.boot_server(wsgi_eventlet.eventlet_wsgi_server)

View File

@@ -1,46 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo_log import log
from neutron import service
LOG = log.getLogger(__name__)
def eventlet_wsgi_server():
neutron_api = service.serve_wsgi(service.NeutronApiService)
start_api_and_rpc_workers(neutron_api)
def start_api_and_rpc_workers(neutron_api):
try:
worker_launcher = service.start_all_workers(neutron_api)
pool = eventlet.GreenPool()
api_thread = pool.spawn(neutron_api.wait)
plugin_workers_thread = pool.spawn(worker_launcher.wait)
# api and other workers should die together. When one dies,
# kill the other.
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())
pool.waitall()
except NotImplementedError:
LOG.info("RPC was already started in parent process by "
"plugin.")
neutron_api.wait()

View File

@@ -36,7 +36,6 @@ from oslo_utils import importutils
import psutil import psutil
from neutron._i18n import _ from neutron._i18n import _
from neutron.api import wsgi
from neutron.common import config from neutron.common import config
from neutron.common import profiler from neutron.common import profiler
from neutron.conf import service from neutron.conf import service
@@ -51,52 +50,6 @@ service.register_service_opts(service.RPC_EXTRA_OPTS)
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class WsgiService:
"""Base class for WSGI based services.
For each api you define, you must also define these flags:
:<api>_listen: The address on which to listen
:<api>_listen_port: The port on which to listen
"""
def __init__(self, app_name):
self.app_name = app_name
self.wsgi_app = None
def start(self):
self.wsgi_app = _run_wsgi(self.app_name)
def wait(self):
self.wsgi_app.wait()
class NeutronApiService(WsgiService):
"""Class for neutron-api service."""
def __init__(self, app_name):
profiler.setup('neutron-server', cfg.CONF.host)
super().__init__(app_name)
@classmethod
def create(cls, app_name='neutron'):
service = cls(app_name)
return service
def serve_wsgi(cls):
try:
service = cls.create()
service.start()
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception('Unrecoverable error: please check log '
'for details.')
registry.publish(resources.PROCESS, events.BEFORE_SPAWN, service)
return service
class RpcWorker(neutron_worker.NeutronBaseWorker): class RpcWorker(neutron_worker.NeutronBaseWorker):
"""Wraps a worker to be handled by ProcessLauncher""" """Wraps a worker to be handled by ProcessLauncher"""
start_listeners_method = 'start_rpc_listeners' start_listeners_method = 'start_rpc_listeners'
@@ -317,13 +270,6 @@ def _start_workers(workers, neutron_api=None):
'details.') 'details.')
def start_all_workers(neutron_api=None):
workers = _get_rpc_workers() + _get_plugins_workers()
launcher = _start_workers(workers, neutron_api)
registry.publish(resources.PROCESS, events.AFTER_SPAWN, None)
return launcher
def start_rpc_workers(): def start_rpc_workers():
rpc_workers = _get_rpc_workers() rpc_workers = _get_rpc_workers()
LOG.debug('Using launcher for rpc, workers=%s (configured rpc_workers=%s)', LOG.debug('Using launcher for rpc, workers=%s (configured rpc_workers=%s)',
@@ -362,23 +308,6 @@ def _get_api_workers():
return workers return workers
def _run_wsgi(app_name):
app = config.load_paste_app(app_name)
if not app:
LOG.error('No known API applications configured.')
return
return run_wsgi_app(app)
def run_wsgi_app(app):
server = wsgi.Server("Neutron")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=_get_api_workers(), desc="api worker")
LOG.info("Neutron service started, listening on %(host)s:%(port)s",
{'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
return server
class Service(n_rpc.Service): class Service(n_rpc.Service):
"""Service object for binaries running on hosts. """Service object for binaries running on hosts.

View File

@@ -21,13 +21,11 @@ import signal
import traceback import traceback
from unittest import mock from unittest import mock
import httplib2
from neutron_lib import worker as neutron_worker from neutron_lib import worker as neutron_worker
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
import psutil import psutil
from neutron.api import wsgi
from neutron.common import utils from neutron.common import utils
from neutron import manager from neutron import manager
from neutron import service from neutron import service
@@ -193,64 +191,6 @@ class TestNeutronServer(base.BaseLoggingTestCase,
self.assertEqual(expected_msg, ret_msg) self.assertEqual(expected_msg, ret_msg)
class TestWsgiServer(TestNeutronServer):
"""Tests for neutron.api.wsgi.Server."""
def setUp(self):
super().setUp()
self.port = None
@staticmethod
def application(environ, start_response):
"""A primitive test application."""
response_body = 'Response'
status = '200 OK'
response_headers = [('Content-Type', 'text/plain'),
('Content-Length', str(len(response_body)))]
start_response(status, response_headers)
return [response_body]
def _check_active(self):
"""Check a wsgi service is active by making a GET request."""
port = int(os.read(self.pipein, 5))
conn = httplib2.HTTPConnectionWithTimeout("localhost", port)
try:
conn.request("GET", "/")
resp = conn.getresponse()
return resp.status == 200
except OSError:
return False
def _run_wsgi(self, workers=1):
"""Start WSGI server with a test application."""
# Mock start method to check that children are started again on
# receiving SIGHUP.
with mock.patch(
"neutron.api.wsgi.WorkerService.start"
) as start_method, mock.patch(
"neutron.api.wsgi.WorkerService.reset"
) as reset_method:
start_method.side_effect = self._fake_start
reset_method.side_effect = self._fake_reset
server = wsgi.Server("Test")
server.start(self.application, 0, "0.0.0.0",
workers=workers)
# Memorize a port that was chosen for the service
self.port = server.port
os.write(self.pipeout, bytes(str(self.port), 'utf-8'))
server.wait()
@tests_base.unstable_test('bug 1930367')
def test_restart_wsgi_on_sighup_multiple_workers(self):
self._test_restart_service_on_sighup(service=self._run_wsgi,
workers=2)
class TestRPCServer(TestNeutronServer): class TestRPCServer(TestNeutronServer):
"""Tests for neutron RPC server.""" """Tests for neutron RPC server."""

View File

@@ -13,16 +13,11 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import socket
import ssl
from unittest import mock from unittest import mock
import urllib
from neutron_lib.db import api as db_api from neutron_lib.db import api as db_api
from neutron_lib import exceptions as exception from neutron_lib import exceptions as exception
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import netutils
import testtools import testtools
import webob import webob
import webob.exc import webob.exc
@@ -32,25 +27,6 @@ from neutron.tests import base
CONF = cfg.CONF CONF = cfg.CONF
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..', 'var'))
def open_no_proxy(*args, **kwargs):
# NOTE(jamespage):
# Deal with more secure certification chain verification
# introduced in python 2.7.9 under PEP-0476
# https://github.com/python/peps/blob/master/pep-0476.txt
if hasattr(ssl, "_create_unverified_context"):
opener = urllib.request.build_opener(
urllib.request.ProxyHandler({}),
urllib.request.HTTPSHandler(
context=ssl._create_unverified_context())
)
else:
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
return opener.open(*args, **kwargs)
class TestWorkerService(base.BaseTestCase): class TestWorkerService(base.BaseTestCase):
"""WorkerService tests.""" """WorkerService tests."""
@@ -81,136 +57,6 @@ class TestWorkerService(base.BaseTestCase):
self._test_reset(worker_service) self._test_reset(worker_service)
class TestWSGIServer(base.BaseTestCase):
"""WSGI server tests."""
def test_start_random_port(self):
server = wsgi.Server("test_random_port")
server.start(None, 0, host="127.0.0.1")
self.assertNotEqual(0, server.port)
server.stop()
server.wait()
@mock.patch('oslo_service.service.ProcessLauncher')
def test_start_multiple_workers(self, ProcessLauncher):
launcher = ProcessLauncher.return_value
server = wsgi.Server("test_multiple_processes")
server.start(None, 0, host="127.0.0.1", workers=2)
launcher.launch_service.assert_called_once_with(mock.ANY, workers=2)
server.stop()
launcher.stop.assert_called_once_with()
server.wait()
launcher.wait.assert_called_once_with()
@testtools.skipIf(
not netutils.is_ipv6_enabled(),
'IPv6 support disabled on host')
def test_start_random_port_with_ipv6(self):
server = wsgi.Server("test_random_port")
server.start(None, 0, host="::1")
self.assertEqual("::1", server.host)
self.assertNotEqual(0, server.port)
server.stop()
server.wait()
def test_ipv6_listen_called_with_scope(self):
server = wsgi.Server("test_app")
with mock.patch.object(wsgi.eventlet, 'listen') as mock_listen:
with mock.patch.object(socket, 'getaddrinfo') as mock_get_addr:
mock_get_addr.return_value = [
(socket.AF_INET6,
socket.SOCK_STREAM,
socket.IPPROTO_TCP,
'',
('fe80::204:acff:fe96:da87%eth0', 1234, 0, 2))
]
with mock.patch.object(server, 'pool') as mock_pool:
server.start(None,
1234,
host="fe80::204:acff:fe96:da87%eth0")
mock_get_addr.assert_called_once_with(
"fe80::204:acff:fe96:da87%eth0",
1234,
socket.AF_UNSPEC,
socket.SOCK_STREAM
)
mock_listen.assert_called_once_with(
('fe80::204:acff:fe96:da87%eth0', 1234, 0, 2),
family=socket.AF_INET6,
backlog=cfg.CONF.backlog
)
mock_pool.spawn.assert_has_calls([
mock.call(
server._run,
None,
mock_listen.return_value.dup.return_value)
])
def test_app(self):
greetings = b'Hello, World!!!'
def hello_world(env, start_response):
if env['PATH_INFO'] != '/':
start_response('404 Not Found',
[('Content-Type', 'text/plain')])
return ['Not Found\r\n']
start_response('200 OK', [('Content-Type', 'text/plain')])
return [greetings]
server = wsgi.Server("test_app")
server.start(hello_world, 0, host="127.0.0.1")
response = open_no_proxy('http://127.0.0.1:%d/' % server.port)
self.assertEqual(greetings, response.read())
server.stop()
def test_disable_ssl(self):
CONF.set_default('use_ssl', True)
greetings = 'Hello, World!!!'
def hello_world(env, start_response):
if env['PATH_INFO'] != '/':
start_response('404 Not Found',
[('Content-Type', 'text/plain')])
return ['Not Found\r\n']
start_response('200 OK', [('Content-Type', 'text/plain')])
return [greetings]
server = wsgi.Server("test_app", disable_ssl=True)
server.start(hello_world, 0, host="127.0.0.1")
response = open_no_proxy('http://127.0.0.1:%d/' % server.port)
self.assertEqual(greetings.encode('utf-8'), response.read())
server.stop()
@mock.patch.object(wsgi, 'eventlet')
def test__run(self, eventlet_mock):
server = wsgi.Server('test')
server._run("app", "socket")
eventlet_mock.wsgi.server.assert_called_once_with(
'socket',
'app',
max_size=server.num_threads,
log=mock.ANY,
keepalive=CONF.wsgi_keep_alive,
log_format=CONF.wsgi_log_format,
socket_timeout=server.client_socket_timeout,
debug=False,
)
class SerializerTest(base.BaseTestCase): class SerializerTest(base.BaseTestCase):
def test_serialize_unknown_content_type(self): def test_serialize_unknown_content_type(self):
"""Verify that exception InvalidContentType is raised.""" """Verify that exception InvalidContentType is raised."""

View File

@@ -15,9 +15,6 @@
from unittest import mock from unittest import mock
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from oslo_concurrency import processutils from oslo_concurrency import processutils
from oslo_config import cfg from oslo_config import cfg
@@ -82,41 +79,3 @@ class TestRunRpcWorkers(base.BaseTestCase):
def test_rpc_workers_defined(self): def test_rpc_workers_defined(self):
self._test_rpc_workers(42, 42) self._test_rpc_workers(42, 42)
class TestRunWsgiApp(base.BaseTestCase):
def setUp(self):
super().setUp()
self.worker_count = service._get_worker_count()
def _test_api_workers(self, config_value, expected_passed_value):
if config_value is not None:
cfg.CONF.set_override('api_workers', config_value)
with mock.patch('neutron.api.wsgi.Server') as mock_server:
service.run_wsgi_app(mock.sentinel.app)
start_call = mock_server.return_value.start.call_args
expected_call = mock.call(
mock.ANY, mock.ANY, mock.ANY, desc='api worker',
workers=expected_passed_value)
self.assertEqual(expected_call, start_call)
def test_api_workers_one(self):
self._test_api_workers(1, 1)
def test_api_workers_default(self):
self._test_api_workers(None, self.worker_count)
def test_api_workers_defined(self):
self._test_api_workers(42, 42)
def test_start_all_workers(self):
cfg.CONF.set_override('api_workers', 1)
mock.patch.object(service, '_get_rpc_workers').start()
mock.patch.object(service, '_get_plugins_workers').start()
mock.patch.object(service, '_start_workers').start()
callback = mock.Mock()
registry.subscribe(callback, resources.PROCESS, events.AFTER_SPAWN)
service.start_all_workers()
callback.assert_called_once_with(
resources.PROCESS, events.AFTER_SPAWN, mock.ANY, payload=None)

View File

@@ -42,7 +42,6 @@ console_scripts =
neutron-netns-cleanup = neutron.cmd.netns_cleanup:main neutron-netns-cleanup = neutron.cmd.netns_cleanup:main
neutron-openvswitch-agent = neutron.cmd.agents.ovs_neutron_agent:main neutron-openvswitch-agent = neutron.cmd.agents.ovs_neutron_agent:main
neutron-ovs-cleanup = neutron.cmd.ovs_cleanup:main neutron-ovs-cleanup = neutron.cmd.ovs_cleanup:main
neutron-server = neutron.cmd.eventlet.server:main
neutron-rpc-server = neutron.cmd.server:main_rpc neutron-rpc-server = neutron.cmd.server:main_rpc
neutron-rootwrap = oslo_rootwrap.cmd:main neutron-rootwrap = oslo_rootwrap.cmd:main
neutron-rootwrap-daemon = oslo_rootwrap.cmd:daemon neutron-rootwrap-daemon = oslo_rootwrap.cmd:daemon